gora-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ktur...@apache.org
Subject svn commit: r1308477 [1/2] - in /gora/trunk: ./ gora-accumulo/ gora-accumulo/src/ gora-accumulo/src/main/ gora-accumulo/src/main/java/ gora-accumulo/src/main/java/org/ gora-accumulo/src/main/java/org/apache/ gora-accumulo/src/main/java/org/apache/gora/...
Date Mon, 02 Apr 2012 19:15:40 GMT
Author: kturner
Date: Mon Apr  2 19:15:38 2012
New Revision: 1308477

URL: http://svn.apache.org/viewvc?rev=1308477&view=rev
Log:
GORA-65 initial checkin of gora accumulo store

Added:
    gora/trunk/gora-accumulo/
    gora/trunk/gora-accumulo/pom.xml   (with props)
    gora/trunk/gora-accumulo/src/
    gora/trunk/gora-accumulo/src/main/
    gora/trunk/gora-accumulo/src/main/java/
    gora/trunk/gora-accumulo/src/main/java/org/
    gora/trunk/gora-accumulo/src/main/java/org/apache/
    gora/trunk/gora-accumulo/src/main/java/org/apache/gora/
    gora/trunk/gora-accumulo/src/main/java/org/apache/gora/accumulo/
    gora/trunk/gora-accumulo/src/main/java/org/apache/gora/accumulo/encoders/
    gora/trunk/gora-accumulo/src/main/java/org/apache/gora/accumulo/encoders/BinaryEncoder.java   (with props)
    gora/trunk/gora-accumulo/src/main/java/org/apache/gora/accumulo/encoders/Encoder.java   (with props)
    gora/trunk/gora-accumulo/src/main/java/org/apache/gora/accumulo/encoders/HexEncoder.java   (with props)
    gora/trunk/gora-accumulo/src/main/java/org/apache/gora/accumulo/encoders/SignedBinaryEncoder.java   (with props)
    gora/trunk/gora-accumulo/src/main/java/org/apache/gora/accumulo/encoders/Utils.java   (with props)
    gora/trunk/gora-accumulo/src/main/java/org/apache/gora/accumulo/query/
    gora/trunk/gora-accumulo/src/main/java/org/apache/gora/accumulo/query/AccumuloQuery.java   (with props)
    gora/trunk/gora-accumulo/src/main/java/org/apache/gora/accumulo/query/AccumuloResult.java   (with props)
    gora/trunk/gora-accumulo/src/main/java/org/apache/gora/accumulo/store/
    gora/trunk/gora-accumulo/src/main/java/org/apache/gora/accumulo/store/AccumuloMapping.java   (with props)
    gora/trunk/gora-accumulo/src/main/java/org/apache/gora/accumulo/store/AccumuloStore.java   (with props)
    gora/trunk/gora-accumulo/src/main/java/org/apache/gora/accumulo/util/
    gora/trunk/gora-accumulo/src/main/java/org/apache/gora/accumulo/util/FixedByteArrayOutputStream.java   (with props)
    gora/trunk/gora-accumulo/src/test/
    gora/trunk/gora-accumulo/src/test/java/
    gora/trunk/gora-accumulo/src/test/java/org/
    gora/trunk/gora-accumulo/src/test/java/org/apache/
    gora/trunk/gora-accumulo/src/test/java/org/apache/gora/
    gora/trunk/gora-accumulo/src/test/java/org/apache/gora/accumulo/
    gora/trunk/gora-accumulo/src/test/java/org/apache/gora/accumulo/store/
    gora/trunk/gora-accumulo/src/test/java/org/apache/gora/accumulo/store/AccumuloStoreTest.java   (with props)
    gora/trunk/gora-accumulo/src/test/java/org/apache/gora/accumulo/store/PartitionTest.java   (with props)
    gora/trunk/gora-accumulo/src/test/java/org/apache/gora/accumulo/util/
    gora/trunk/gora-accumulo/src/test/java/org/apache/gora/accumulo/util/HexEncoderTest.java   (with props)
    gora/trunk/gora-accumulo/src/test/java/org/apache/gora/accumulo/util/SignedBinaryEncoderTest.java   (with props)
    gora/trunk/gora-accumulo/src/test/resources/
    gora/trunk/gora-accumulo/src/test/resources/gora-accumulo-mapping.xml   (with props)
    gora/trunk/gora-accumulo/src/test/resources/gora.properties   (with props)
Modified:
    gora/trunk/pom.xml

Added: gora/trunk/gora-accumulo/pom.xml
URL: http://svn.apache.org/viewvc/gora/trunk/gora-accumulo/pom.xml?rev=1308477&view=auto
==============================================================================
--- gora/trunk/gora-accumulo/pom.xml (added)
+++ gora/trunk/gora-accumulo/pom.xml Mon Apr  2 19:15:38 2012
@@ -0,0 +1,69 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+	<modelVersion>4.0.0</modelVersion>
+
+	<parent>
+		<groupId>org.apache.gora</groupId>
+		<artifactId>gora</artifactId>
+		<version>0.2-SNAPSHOT</version>
+	</parent>
+	<artifactId>gora-accumulo</artifactId>
+	<packaging>jar</packaging>
+
+	<name>Apache Gora :: Accumulo</name>
+
+    <dependencies>
+        <!-- Gora Internal Dependencies -->
+        <dependency>
+            <groupId>org.apache.gora</groupId>
+            <artifactId>gora-core</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.gora</groupId>
+            <artifactId>gora-core</artifactId>
+            <classifier>tests</classifier>
+            <scope>test</scope>
+        </dependency>
+
+        <!--Accumulo Dependency -->
+        <dependency>
+           <groupId>org.apache.accumulo</groupId>
+           <artifactId>accumulo-core</artifactId>
+           <version>1.4.0</version>
+        </dependency>
+
+
+        <!-- Hadoop Dependencies -->
+        
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>avro</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>log4j</groupId>
+            <artifactId>log4j</artifactId>
+	     <exclusions>
+	       <exclusion>
+                <groupId>javax.jms</groupId>
+	        <artifactId>jms</artifactId>
+	      </exclusion>
+            </exclusions>
+
+        </dependency>
+
+        <!-- Testing Dependencies -->
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-test</artifactId>
+        </dependency>
+
+    </dependencies>
+
+</project>

Propchange: gora/trunk/gora-accumulo/pom.xml
------------------------------------------------------------------------------
    svn:eol-style = native

Added: gora/trunk/gora-accumulo/src/main/java/org/apache/gora/accumulo/encoders/BinaryEncoder.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-accumulo/src/main/java/org/apache/gora/accumulo/encoders/BinaryEncoder.java?rev=1308477&view=auto
==============================================================================
--- gora/trunk/gora-accumulo/src/main/java/org/apache/gora/accumulo/encoders/BinaryEncoder.java (added)
+++ gora/trunk/gora-accumulo/src/main/java/org/apache/gora/accumulo/encoders/BinaryEncoder.java Mon Apr  2 19:15:38 2012
@@ -0,0 +1,197 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gora.accumulo.encoders;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.gora.accumulo.util.FixedByteArrayOutputStream;
+
+/**
+ * 
+ */
+public class BinaryEncoder implements Encoder {
+  public byte[] encodeShort(short s) {
+    return encodeShort(s, new byte[2]);
+  }
+  
+  public byte[] encodeShort(short s, byte ret[]) {
+    try {
+      DataOutputStream dos = new DataOutputStream(new FixedByteArrayOutputStream(ret));
+      dos.writeShort(s);
+      return ret;
+    } catch (IOException ioe) {
+      throw new RuntimeException(ioe);
+    }
+  }
+  
+  public short decodeShort(byte[] a) {
+    try {
+      DataInputStream dis = new DataInputStream(new ByteArrayInputStream(a));
+      short s = dis.readShort();
+      return s;
+    } catch (IOException ioe) {
+      throw new RuntimeException(ioe);
+    }
+  }
+  
+  public byte[] encodeInt(int i) {
+    return encodeInt(i, new byte[4]);
+  }
+  
+  public byte[] encodeInt(int i, byte ret[]) {
+    try {
+      DataOutputStream dos = new DataOutputStream(new FixedByteArrayOutputStream(ret));
+      dos.writeInt(i);
+      return ret;
+    } catch (IOException ioe) {
+      throw new RuntimeException(ioe);
+    }
+  }
+  
+  public int decodeInt(byte[] a) {
+    try {
+      DataInputStream dis = new DataInputStream(new ByteArrayInputStream(a));
+      int i = dis.readInt();
+      return i;
+    } catch (IOException ioe) {
+      throw new RuntimeException(ioe);
+    }
+  }
+  
+  public byte[] encodeLong(long l) {
+    return encodeLong(l, new byte[8]);
+  }
+  
+  public byte[] encodeLong(long l, byte ret[]) {
+    try {
+      DataOutputStream dos = new DataOutputStream(new FixedByteArrayOutputStream(ret));
+      dos.writeLong(l);
+      return ret;
+    } catch (IOException ioe) {
+      throw new RuntimeException(ioe);
+    }
+  }
+  
+  public long decodeLong(byte[] a) {
+    try {
+      DataInputStream dis = new DataInputStream(new ByteArrayInputStream(a));
+      long l = dis.readLong();
+      return l;
+    } catch (IOException ioe) {
+      throw new RuntimeException(ioe);
+    }
+  }
+  
+  public byte[] encodeDouble(double d) {
+    return encodeDouble(d, new byte[8]);
+  }
+  
+  public byte[] encodeDouble(double d, byte[] ret) {
+    try {
+      long l = Double.doubleToRawLongBits(d);
+      DataOutputStream dos = new DataOutputStream(new FixedByteArrayOutputStream(ret));
+      dos.writeLong(l);
+      return ret;
+    } catch (IOException ioe) {
+      throw new RuntimeException(ioe);
+    }
+  }
+  
+  public double decodeDouble(byte[] a) {
+    try {
+      DataInputStream dis = new DataInputStream(new ByteArrayInputStream(a));
+      long l = dis.readLong();
+      return Double.longBitsToDouble(l);
+    } catch (IOException ioe) {
+      throw new RuntimeException(ioe);
+    }
+  }
+  
+  public byte[] encodeFloat(float d) {
+    return encodeFloat(d, new byte[4]);
+  }
+  
+  public byte[] encodeFloat(float f, byte[] ret) {
+    try {
+      int i = Float.floatToRawIntBits(f);
+      DataOutputStream dos = new DataOutputStream(new FixedByteArrayOutputStream(ret));
+      dos.writeInt(i);
+      return ret;
+    } catch (IOException ioe) {
+      throw new RuntimeException(ioe);
+    }
+  }
+  
+  public float decodeFloat(byte[] a) {
+    try {
+      DataInputStream dis = new DataInputStream(new ByteArrayInputStream(a));
+      int i = dis.readInt();
+      return Float.intBitsToFloat(i);
+    } catch (IOException ioe) {
+      throw new RuntimeException(ioe);
+    }
+  }
+  
+  public byte[] encodeByte(byte b, byte[] ret) {
+    ret[0] = 0;
+    return ret;
+  }
+  
+  public byte[] encodeByte(byte b) {
+    return encodeByte(b, new byte[1]);
+  }
+  
+  public byte decodeByte(byte[] a) {
+    return a[0];
+  }
+  
+  public boolean decodeBoolean(byte[] a) {
+    try {
+      DataInputStream dis = new DataInputStream(new ByteArrayInputStream(a));
+      return dis.readBoolean();
+    } catch (IOException ioe) {
+      throw new RuntimeException(ioe);
+    }
+  }
+  
+  public byte[] encodeBoolean(boolean b) {
+    return encodeBoolean(b, new byte[1]);
+  }
+  
+  public byte[] encodeBoolean(boolean b, byte[] ret) {
+    try {
+      DataOutputStream dos = new DataOutputStream(new FixedByteArrayOutputStream(ret));
+      dos.writeBoolean(b);
+      return ret;
+    } catch (IOException ioe) {
+      throw new RuntimeException(ioe);
+    }
+  }
+  
+  @Override
+  public byte[] lastPossibleKey(int size, byte[] er) {
+    return Utils.lastPossibleKey(size, er);
+  }
+  
+  @Override
+  public byte[] followingKey(int size, byte[] per) {
+    return Utils.followingKey(size, per);
+  }
+}

Propchange: gora/trunk/gora-accumulo/src/main/java/org/apache/gora/accumulo/encoders/BinaryEncoder.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: gora/trunk/gora-accumulo/src/main/java/org/apache/gora/accumulo/encoders/Encoder.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-accumulo/src/main/java/org/apache/gora/accumulo/encoders/Encoder.java?rev=1308477&view=auto
==============================================================================
--- gora/trunk/gora-accumulo/src/main/java/org/apache/gora/accumulo/encoders/Encoder.java (added)
+++ gora/trunk/gora-accumulo/src/main/java/org/apache/gora/accumulo/encoders/Encoder.java Mon Apr  2 19:15:38 2012
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gora.accumulo.encoders;
+
+/**
+ * 
+ */
+public interface Encoder {
+  
+  public byte[] encodeByte(byte b, byte[] ret);
+  
+  public byte[] encodeByte(byte b);
+  
+  public byte decodeByte(byte[] a);
+
+  public byte[] encodeShort(short s);
+  
+  public byte[] encodeShort(short s, byte ret[]);
+  
+  public short decodeShort(byte[] a);
+  
+  public byte[] encodeInt(int i);
+  
+  public byte[] encodeInt(int i, byte ret[]);
+  
+  public int decodeInt(byte[] a);
+  
+  public byte[] encodeLong(long l);
+  
+  public byte[] encodeLong(long l, byte ret[]);
+  
+  public long decodeLong(byte[] a);
+  
+  public byte[] encodeDouble(double d);
+  
+  public byte[] encodeDouble(double d, byte[] ret);
+  
+  public double decodeDouble(byte[] a);
+  
+  public byte[] encodeFloat(float d);
+  
+  public byte[] encodeFloat(float f, byte[] ret);
+  
+  public float decodeFloat(byte[] a);
+  
+  public boolean decodeBoolean(byte[] val);
+  
+  public byte[] encodeBoolean(boolean b);
+  
+  public byte[] encodeBoolean(boolean b, byte[] ret);
+
+  byte[] followingKey(int size, byte[] per);
+
+  byte[] lastPossibleKey(int size, byte[] er);
+
+}

Propchange: gora/trunk/gora-accumulo/src/main/java/org/apache/gora/accumulo/encoders/Encoder.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: gora/trunk/gora-accumulo/src/main/java/org/apache/gora/accumulo/encoders/HexEncoder.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-accumulo/src/main/java/org/apache/gora/accumulo/encoders/HexEncoder.java?rev=1308477&view=auto
==============================================================================
--- gora/trunk/gora-accumulo/src/main/java/org/apache/gora/accumulo/encoders/HexEncoder.java (added)
+++ gora/trunk/gora-accumulo/src/main/java/org/apache/gora/accumulo/encoders/HexEncoder.java Mon Apr  2 19:15:38 2012
@@ -0,0 +1,207 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gora.accumulo.encoders;
+
+/**
+ * Encodes data in a ascii hex representation
+ */
+
+public class HexEncoder implements Encoder {
+  
+  private byte chars[] = new byte[] {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f'};
+
+  private void encode(byte[] a, long l) {
+    for (int i = a.length - 1; i >= 0; i--) {
+      a[i] = chars[(int) (l & 0x0f)];
+      l = l >>> 4;
+    }
+  }
+
+  private int fromChar(byte b) {
+    if (b >= '0' && b <= '9') {
+      return (b - '0');
+    } else if (b >= 'a' && b <= 'f') {
+      return (b - 'a' + 10);
+    }
+    
+    throw new IllegalArgumentException("Bad char " + b);
+  }
+  
+  private long decode(byte[] a) {
+    long b = 0;
+    for (int i = 0; i < a.length; i++) {
+      b = b << 4;
+      b |= fromChar(a[i]);
+    }
+    
+    return b;
+  }
+
+  @Override
+  public byte[] encodeByte(byte b, byte[] ret) {
+    encode(ret, 0xff & b);
+    return ret;
+  }
+  
+  @Override
+  public byte[] encodeByte(byte b) {
+    return encodeByte(b, new byte[2]);
+  }
+  
+  @Override
+  public byte decodeByte(byte[] a) {
+    return (byte) decode(a);
+  }
+  
+  @Override
+  public byte[] encodeShort(short s) {
+    return encodeShort(s, new byte[4]);
+  }
+  
+  @Override
+  public byte[] encodeShort(short s, byte[] ret) {
+    encode(ret, 0xffff & s);
+    return ret;
+  }
+  
+  @Override
+  public short decodeShort(byte[] a) {
+    return (short) decode(a);
+  }
+  
+  @Override
+  public byte[] encodeInt(int i) {
+    return encodeInt(i, new byte[8]);
+  }
+  
+  @Override
+  public byte[] encodeInt(int i, byte[] ret) {
+    encode(ret, i);
+    return ret;
+  }
+  
+  @Override
+  public int decodeInt(byte[] a) {
+    return (int) decode(a);
+  }
+  
+  @Override
+  public byte[] encodeLong(long l) {
+    return encodeLong(l, new byte[16]);
+  }
+  
+  @Override
+  public byte[] encodeLong(long l, byte[] ret) {
+    encode(ret, l);
+    return ret;
+  }
+  
+  @Override
+  public long decodeLong(byte[] a) {
+    return decode(a);
+  }
+  
+  @Override
+  public byte[] encodeDouble(double d) {
+    return encodeDouble(d, new byte[16]);
+  }
+  
+  @Override
+  public byte[] encodeDouble(double d, byte[] ret) {
+    return encodeLong(Double.doubleToRawLongBits(d), ret);
+  }
+  
+  @Override
+  public double decodeDouble(byte[] a) {
+    return Double.longBitsToDouble(decodeLong(a));
+  }
+  
+  @Override
+  public byte[] encodeFloat(float d) {
+    return encodeFloat(d, new byte[16]);
+  }
+  
+  @Override
+  public byte[] encodeFloat(float d, byte[] ret) {
+    return encodeInt(Float.floatToRawIntBits(d), ret);
+  }
+  
+  @Override
+  public float decodeFloat(byte[] a) {
+    return Float.intBitsToFloat(decodeInt(a));
+  }
+  
+  @Override
+  public boolean decodeBoolean(byte[] val) {
+    if (decodeByte(val) == 1) {
+      return true;
+    }
+    return false;
+  }
+  
+  @Override
+  public byte[] encodeBoolean(boolean b) {
+    return encodeBoolean(b, new byte[2]);
+  }
+  
+  @Override
+  public byte[] encodeBoolean(boolean b, byte[] ret) {
+    if (b)
+      encode(ret, 1);
+    else
+      encode(ret, 0);
+    
+    return ret;
+  }
+  
+  private byte[] toBinary(byte[] hex) {
+    byte[] bin = new byte[(hex.length / 2) + (hex.length % 2)];
+    
+    int j = 0;
+    for (int i = 0; i < bin.length; i++) {
+      bin[i] = (byte) (fromChar(hex[j++]) << 4);
+      if (j >= hex.length)
+        break;
+      bin[i] |= (byte) fromChar(hex[j++]);
+    }
+    
+    return bin;
+  }
+  
+  private byte[] fromBinary(byte[] bin) {
+    byte[] hex = new byte[bin.length * 2];
+    
+    int j = 0;
+    for (int i = 0; i < bin.length; i++) {
+      hex[j++] = chars[0x0f & (bin[i] >>> 4)];
+      hex[j++] = chars[0x0f & bin[i]];
+    }
+    
+    return hex;
+  }
+
+  @Override
+  public byte[] followingKey(int size, byte[] per) {
+    return fromBinary(Utils.followingKey(size, toBinary(per)));
+  }
+  
+  @Override
+  public byte[] lastPossibleKey(int size, byte[] er) {
+    return fromBinary(Utils.lastPossibleKey(size, toBinary(er)));
+  }
+  
+}

Propchange: gora/trunk/gora-accumulo/src/main/java/org/apache/gora/accumulo/encoders/HexEncoder.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: gora/trunk/gora-accumulo/src/main/java/org/apache/gora/accumulo/encoders/SignedBinaryEncoder.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-accumulo/src/main/java/org/apache/gora/accumulo/encoders/SignedBinaryEncoder.java?rev=1308477&view=auto
==============================================================================
--- gora/trunk/gora-accumulo/src/main/java/org/apache/gora/accumulo/encoders/SignedBinaryEncoder.java (added)
+++ gora/trunk/gora-accumulo/src/main/java/org/apache/gora/accumulo/encoders/SignedBinaryEncoder.java Mon Apr  2 19:15:38 2012
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gora.accumulo.encoders;
+
+
+/**
+ * This class transforms this bits within a primitive type so that 
+ * the bit representation sorts correctly lexographicaly. Primarily 
+ * it does some simple transformations so that negative numbers sort 
+ * before positive numbers, when compared lexographically.
+ */
+public class SignedBinaryEncoder extends BinaryEncoder {
+  
+  public byte[] encodeShort(short s, byte ret[]){
+    s = (short)((s & 0xffff) ^ 0x8000);
+    return super.encodeShort(s, ret);
+  }
+  
+  public short decodeShort(byte[] a){
+    short s = super.decodeShort(a);
+    s = (short)((s & 0xffff) ^ 0x8000);
+    return s;
+  }
+  
+  public byte[] encodeInt(int i, byte ret[]){
+    i = i ^ 0x80000000;
+    return super.encodeInt(i, ret);
+  }
+  
+  public int decodeInt(byte[] a){
+    int i = super.decodeInt(a);
+    i = i ^ 0x80000000;
+    return i;
+  }
+  
+  public byte[] encodeLong(long l, byte ret[]){
+    l = l ^ 0x8000000000000000l;
+    return super.encodeLong(l, ret);
+  }
+  
+  public long decodeLong(byte[] a) {
+    long l = super.decodeLong(a);
+    l = l ^ 0x8000000000000000l;
+    return l;
+  }
+  
+  
+  public byte[] encodeDouble(double d, byte[] ret) {
+    long l = Double.doubleToRawLongBits(d);
+    if(l < 0)
+      l = ~l;
+    else
+      l = l ^ 0x8000000000000000l;
+    return super.encodeLong(l,ret);
+  }
+  
+  public double decodeDouble(byte[] a){
+    long l = super.decodeLong(a);
+    if(l < 0)
+      l = l ^ 0x8000000000000000l;
+    else
+      l = ~l;
+    return Double.longBitsToDouble(l);
+  }
+  
+  public byte[] encodeFloat(float f, byte[] ret) {
+    int i = Float.floatToRawIntBits(f);
+    if(i < 0)
+      i = ~i;
+    else
+      i = i ^ 0x80000000;
+    
+    return super.encodeInt(i, ret);
+    
+  }
+  
+  public float decodeFloat(byte[] a){
+    int i = super.decodeInt(a);
+    if(i < 0)
+      i = i ^ 0x80000000;
+    else
+      i = ~i;
+    return Float.intBitsToFloat(i);
+  }
+  
+}

Propchange: gora/trunk/gora-accumulo/src/main/java/org/apache/gora/accumulo/encoders/SignedBinaryEncoder.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: gora/trunk/gora-accumulo/src/main/java/org/apache/gora/accumulo/encoders/Utils.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-accumulo/src/main/java/org/apache/gora/accumulo/encoders/Utils.java?rev=1308477&view=auto
==============================================================================
--- gora/trunk/gora-accumulo/src/main/java/org/apache/gora/accumulo/encoders/Utils.java (added)
+++ gora/trunk/gora-accumulo/src/main/java/org/apache/gora/accumulo/encoders/Utils.java Mon Apr  2 19:15:38 2012
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gora.accumulo.encoders;
+
+import java.math.BigInteger;
+import java.util.Arrays;
+
+/**
+ * 
+ */
+public class Utils {
+  private static BigInteger newPositiveBigInteger(byte[] er) {
+    byte[] copy = new byte[er.length + 1];
+    System.arraycopy(er, 0, copy, 1, er.length);
+    BigInteger bi = new BigInteger(copy);
+    return bi;
+  }
+  
+  public static byte[] lastPossibleKey(int size, byte[] er) {
+    if (size == er.length)
+      return er;
+    
+    if (er.length > size)
+      throw new IllegalArgumentException();
+    
+    BigInteger bi = newPositiveBigInteger(er);
+    if (bi.equals(BigInteger.ZERO))
+      throw new IllegalArgumentException("Nothing comes before zero");
+    
+    bi = bi.subtract(BigInteger.ONE);
+    
+    byte ret[] = new byte[size];
+    Arrays.fill(ret, (byte) 0xff);
+    
+    System.arraycopy(getBytes(bi, er.length), 0, ret, 0, er.length);
+    
+    return ret;
+  }
+  
+  private static byte[] getBytes(BigInteger bi, int minLen) {
+    byte[] ret = bi.toByteArray();
+    
+    if (ret[0] == 0) {
+      // remove leading 0 that makes num positive
+      byte copy[] = new byte[ret.length - 1];
+      System.arraycopy(ret, 1, copy, 0, copy.length);
+      ret = copy;
+    }
+    
+    // leading digits are dropped
+    byte copy[] = new byte[minLen];
+    if (bi.compareTo(BigInteger.ZERO) < 0) {
+      Arrays.fill(copy, (byte) 0xff);
+    }
+    System.arraycopy(ret, 0, copy, minLen - ret.length, ret.length);
+    
+    return copy;
+  }
+  
+  public static byte[] followingKey(int size, byte[] per) {
+    
+    if (per.length > size)
+      throw new IllegalArgumentException();
+    
+    if (size == per.length) {
+      // add one
+      BigInteger bi = new BigInteger(per);
+      bi = bi.add(BigInteger.ONE);
+      if (bi.equals(BigInteger.ZERO)) {
+        throw new IllegalArgumentException("Wrapped");
+      }
+      return getBytes(bi, size);
+    } else {
+      return Arrays.copyOf(per, size);
+    }
+  }
+}

Propchange: gora/trunk/gora-accumulo/src/main/java/org/apache/gora/accumulo/encoders/Utils.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: gora/trunk/gora-accumulo/src/main/java/org/apache/gora/accumulo/query/AccumuloQuery.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-accumulo/src/main/java/org/apache/gora/accumulo/query/AccumuloQuery.java?rev=1308477&view=auto
==============================================================================
--- gora/trunk/gora-accumulo/src/main/java/org/apache/gora/accumulo/query/AccumuloQuery.java (added)
+++ gora/trunk/gora-accumulo/src/main/java/org/apache/gora/accumulo/query/AccumuloQuery.java Mon Apr  2 19:15:38 2012
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gora.accumulo.query;
+
+import org.apache.gora.persistency.Persistent;
+import org.apache.gora.query.impl.QueryBase;
+import org.apache.gora.store.DataStore;
+
+/**
+ * 
+ */
+public class AccumuloQuery<K,T extends Persistent> extends QueryBase<K,T> {
+  
+  public AccumuloQuery() {
+    super(null);
+  }
+
+  public AccumuloQuery(DataStore<K,T> dataStore) {
+    super(dataStore);
+  }
+  
+}

Propchange: gora/trunk/gora-accumulo/src/main/java/org/apache/gora/accumulo/query/AccumuloQuery.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: gora/trunk/gora-accumulo/src/main/java/org/apache/gora/accumulo/query/AccumuloResult.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-accumulo/src/main/java/org/apache/gora/accumulo/query/AccumuloResult.java?rev=1308477&view=auto
==============================================================================
--- gora/trunk/gora-accumulo/src/main/java/org/apache/gora/accumulo/query/AccumuloResult.java (added)
+++ gora/trunk/gora-accumulo/src/main/java/org/apache/gora/accumulo/query/AccumuloResult.java Mon Apr  2 19:15:38 2012
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gora.accumulo.query;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.client.RowIterator;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.gora.accumulo.store.AccumuloStore;
+import org.apache.gora.persistency.Persistent;
+import org.apache.gora.query.Query;
+import org.apache.gora.query.impl.ResultBase;
+import org.apache.gora.store.DataStore;
+
+/**
+ * 
+ */
+public class AccumuloResult<K,T extends Persistent> extends ResultBase<K,T> {
+  
+  private RowIterator iterator;
+
+  public AccumuloStore<K,T> getDataStore() {
+    return (AccumuloStore<K,T>) super.getDataStore();
+  }
+
+  /**
+   * @param dataStore
+   * @param query
+   * @param scanner
+   */
+  public AccumuloResult(DataStore<K,T> dataStore, Query<K,T> query, Scanner scanner) {
+    super(dataStore, query);
+    
+    // TODO set batch size based on limit, and construct iterator later
+    iterator = new RowIterator(scanner.iterator());
+  }
+  
+  @Override
+  public float getProgress() throws IOException {
+    // TODO Auto-generated method stub
+    return 0;
+  }
+  
+  @Override
+  public void close() throws IOException {
+    
+  }
+  
+  @Override
+  protected boolean nextInner() throws IOException {
+    
+    if (!iterator.hasNext())
+      return false;
+    
+    key = null;
+    
+    Iterator<Entry<Key,Value>> nextRow = iterator.next();
+    ByteSequence row = getDataStore().populate(nextRow, persistent);
+    key = (K) ((AccumuloStore) dataStore).fromBytes(getKeyClass(), row.toArray());
+    
+    return true;
+  }
+  
+}

Propchange: gora/trunk/gora-accumulo/src/main/java/org/apache/gora/accumulo/query/AccumuloResult.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: gora/trunk/gora-accumulo/src/main/java/org/apache/gora/accumulo/store/AccumuloMapping.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-accumulo/src/main/java/org/apache/gora/accumulo/store/AccumuloMapping.java?rev=1308477&view=auto
==============================================================================
--- gora/trunk/gora-accumulo/src/main/java/org/apache/gora/accumulo/store/AccumuloMapping.java (added)
+++ gora/trunk/gora-accumulo/src/main/java/org/apache/gora/accumulo/store/AccumuloMapping.java Mon Apr  2 19:15:38 2012
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gora.accumulo.store;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.accumulo.core.util.Pair;
+import org.apache.hadoop.io.Text;
+
+public class AccumuloMapping {
+  Map<String,Pair<Text,Text>> fieldMap = new HashMap<String,Pair<Text,Text>>();
+  Map<Pair<Text,Text>,String> columnMap = new HashMap<Pair<Text,Text>,String>();
+  Map<String,String> tableConfig = new HashMap<String,String>();
+  String tableName;
+  String encoder;
+
+}

Propchange: gora/trunk/gora-accumulo/src/main/java/org/apache/gora/accumulo/store/AccumuloMapping.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: gora/trunk/gora-accumulo/src/main/java/org/apache/gora/accumulo/store/AccumuloStore.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-accumulo/src/main/java/org/apache/gora/accumulo/store/AccumuloStore.java?rev=1308477&view=auto
==============================================================================
--- gora/trunk/gora-accumulo/src/main/java/org/apache/gora/accumulo/store/AccumuloStore.java (added)
+++ gora/trunk/gora-accumulo/src/main/java/org/apache/gora/accumulo/store/AccumuloStore.java Mon Apr  2 19:15:38 2012
@@ -0,0 +1,840 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gora.accumulo.store;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.Set;
+
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.IsolatedScanner;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.MutationsRejectedException;
+import org.apache.accumulo.core.client.RowIterator;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableDeletedException;
+import org.apache.accumulo.core.client.TableExistsException;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.TableOfflineException;
+import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.client.impl.Tables;
+import org.apache.accumulo.core.client.impl.TabletLocator;
+import org.apache.accumulo.core.client.mock.MockConnector;
+import org.apache.accumulo.core.client.mock.MockInstance;
+import org.apache.accumulo.core.client.mock.MockTabletLocator;
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.KeyExtent;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.SortedKeyIterator;
+import org.apache.accumulo.core.iterators.user.TimestampFilter;
+import org.apache.accumulo.core.master.state.tables.TableState;
+import org.apache.accumulo.core.security.ColumnVisibility;
+import org.apache.accumulo.core.security.thrift.AuthInfo;
+import org.apache.accumulo.core.util.Pair;
+import org.apache.accumulo.core.util.TextUtil;
+import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Field;
+import org.apache.avro.generic.GenericArray;
+import org.apache.avro.io.BinaryDecoder;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.avro.specific.SpecificDatumWriter;
+import org.apache.avro.util.Utf8;
+import org.apache.gora.accumulo.encoders.Encoder;
+import org.apache.gora.accumulo.query.AccumuloQuery;
+import org.apache.gora.accumulo.query.AccumuloResult;
+import org.apache.gora.persistency.ListGenericArray;
+import org.apache.gora.persistency.Persistent;
+import org.apache.gora.persistency.State;
+import org.apache.gora.persistency.StateManager;
+import org.apache.gora.persistency.StatefulHashMap;
+import org.apache.gora.persistency.StatefulMap;
+import org.apache.gora.query.PartitionQuery;
+import org.apache.gora.query.Query;
+import org.apache.gora.query.Result;
+import org.apache.gora.query.impl.PartitionQueryImpl;
+import org.apache.gora.store.DataStoreFactory;
+import org.apache.gora.store.impl.DataStoreBase;
+import org.apache.gora.util.AvroUtils;
+import org.apache.hadoop.io.Text;
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+import org.w3c.dom.NodeList;
+
+/**
+ * 
+ */
+public class AccumuloStore<K,T extends Persistent> extends DataStoreBase<K,T> {
+  
+  protected static final String MOCK_PROPERTY = "accumulo.mock";
+  protected static final String INSTANCE_NAME_PROPERTY = "accumulo.instance";
+  protected static final String ZOOKEEPERS_NAME_PROPERTY = "accumulo.zookeepers";
+  protected static final String USERNAME_PROPERTY = "accumulo.user";
+  protected static final String PASSWORD_PROPERTY = "accumulo.password";
+  protected static final String DEFAULT_MAPPING_FILE = "gora-accumulo-mapping.xml";
+
+  private Connector conn;
+  private BatchWriter batchWriter;
+  private AccumuloMapping mapping;
+  private AuthInfo authInfo;
+  private Encoder encoder;
+  
+  public Object fromBytes(Schema schema, byte data[]) {
+    return fromBytes(encoder, schema, data);
+  }
+
+  public static Object fromBytes(Encoder encoder, Schema schema, byte data[]) {
+    switch (schema.getType()) {
+      case BOOLEAN:
+        return encoder.decodeBoolean(data);
+      case DOUBLE:
+        return encoder.decodeDouble(data);
+      case FLOAT:
+        return encoder.decodeFloat(data);
+      case INT:
+        return encoder.decodeInt(data);
+      case LONG:
+        return encoder.decodeLong(data);
+      case STRING:
+        return new Utf8(data);
+      case BYTES:
+        return ByteBuffer.wrap(data);
+      case ENUM:
+        return AvroUtils.getEnumValue(schema, encoder.decodeInt(data));
+    }
+    throw new IllegalArgumentException("Unknown type " + schema.getType());
+    
+  }
+
+  public K fromBytes(Class<K> clazz, byte[] val) {
+    return fromBytes(encoder, clazz, val);
+  }
+
+  @SuppressWarnings("unchecked")
+  public static <K> K fromBytes(Encoder encoder, Class<K> clazz, byte[] val) {
+    try {
+      if (clazz.equals(Byte.TYPE) || clazz.equals(Byte.class)) {
+        return (K) Byte.valueOf(encoder.decodeByte(val));
+      } else if (clazz.equals(Boolean.TYPE) || clazz.equals(Boolean.class)) {
+        return (K) Boolean.valueOf(encoder.decodeBoolean(val));
+      } else if (clazz.equals(Short.TYPE) || clazz.equals(Short.class)) {
+        return (K) Short.valueOf(encoder.decodeShort(val));
+      } else if (clazz.equals(Integer.TYPE) || clazz.equals(Integer.class)) {
+        return (K) Integer.valueOf(encoder.decodeInt(val));
+      } else if (clazz.equals(Long.TYPE) || clazz.equals(Long.class)) {
+        return (K) Long.valueOf(encoder.decodeLong(val));
+      } else if (clazz.equals(Float.TYPE) || clazz.equals(Float.class)) {
+        return (K) Float.valueOf(encoder.decodeFloat(val));
+      } else if (clazz.equals(Double.TYPE) || clazz.equals(Double.class)) {
+        return (K) Double.valueOf(encoder.decodeDouble(val));
+      } else if (clazz.equals(String.class)) {
+        return (K) new String(val, "UTF-8");
+      } else if (clazz.equals(Utf8.class)) {
+        return (K) new Utf8(val);
+      }
+      
+      throw new IllegalArgumentException("Unknown type " + clazz.getName());
+    } catch (IOException ioe) {
+      throw new RuntimeException(ioe);
+    }
+  }
+
+  private static byte[] copyIfNeeded(byte b[], int offset, int len) {
+    if (len != b.length || offset != 0) {
+      byte copy[] = new byte[len];
+      System.arraycopy(b, offset, copy, 0, copy.length);
+      b = copy;
+    }
+    return b;
+  }
+
+  public byte[] toBytes(Object o) {
+    return toBytes(encoder, o);
+  }
+  
+  public static byte[] toBytes(Encoder encoder, Object o) {
+    
+    try {
+      if (o instanceof String) {
+        return ((String) o).getBytes("UTF-8");
+      } else if (o instanceof Utf8) {
+        return copyIfNeeded(((Utf8) o).getBytes(), 0, ((Utf8) o).getLength());
+      } else if (o instanceof ByteBuffer) {
+        return copyIfNeeded(((ByteBuffer) o).array(), ((ByteBuffer) o).arrayOffset() + ((ByteBuffer) o).position(), ((ByteBuffer) o).remaining());
+      } else if (o instanceof Long) {
+        return encoder.encodeLong((Long) o);
+      } else if (o instanceof Integer) {
+        return encoder.encodeInt((Integer) o);
+      } else if (o instanceof Short) {
+        return encoder.encodeShort((Short) o);
+      } else if (o instanceof Byte) {
+        return encoder.encodeByte((Byte) o);
+      } else if (o instanceof Boolean) {
+        return encoder.encodeBoolean((Boolean) o);
+      } else if (o instanceof Float) {
+        return encoder.encodeFloat((Float) o);
+      } else if (o instanceof Double) {
+        return encoder.encodeDouble((Double) o);
+      } else if (o instanceof Enum) {
+        return encoder.encodeInt(((Enum) o).ordinal());
+      }
+    } catch (IOException ioe) {
+      throw new RuntimeException(ioe);
+    }
+    
+    throw new IllegalArgumentException("Uknown type " + o.getClass().getName());
+  }
+
+  private BatchWriter getBatchWriter() throws IOException {
+    if (batchWriter == null)
+      try {
+        batchWriter = conn.createBatchWriter(mapping.tableName, 10000000, 60000l, 4);
+      } catch (TableNotFoundException e) {
+        throw new IOException(e);
+      }
+    return batchWriter;
+  }
+
+  @Override
+  public void initialize(Class<K> keyClass, Class<T> persistentClass, Properties properties) throws IOException {
+    super.initialize(keyClass, persistentClass, properties);
+
+    String mock = DataStoreFactory.findProperty(properties, this, MOCK_PROPERTY, null);
+    String mappingFile = DataStoreFactory.getMappingFile(properties, this, DEFAULT_MAPPING_FILE);
+    String user = DataStoreFactory.findProperty(properties, this, USERNAME_PROPERTY, null);
+    String password = DataStoreFactory.findProperty(properties, this, PASSWORD_PROPERTY, null);
+    
+    mapping = readMapping(mappingFile);
+
+    if (mapping.encoder == null || mapping.encoder.equals("")) {
+      encoder = new org.apache.gora.accumulo.encoders.BinaryEncoder();
+    } else {
+      try {
+        encoder = (Encoder) getClass().getClassLoader().loadClass(mapping.encoder).newInstance();
+      } catch (InstantiationException e) {
+        throw new IOException(e);
+      } catch (IllegalAccessException e) {
+        throw new IOException(e);
+      } catch (ClassNotFoundException e) {
+        throw new IOException(e);
+      }
+    }
+
+    try {
+      if (mock == null || !mock.equals("true")) {
+        String instance = DataStoreFactory.findProperty(properties, this, INSTANCE_NAME_PROPERTY, null);
+        String zookeepers = DataStoreFactory.findProperty(properties, this, ZOOKEEPERS_NAME_PROPERTY, null);
+        conn = new ZooKeeperInstance(instance, zookeepers).getConnector(user, password);
+        authInfo = new AuthInfo(user, ByteBuffer.wrap(password.getBytes()), conn.getInstance().getInstanceID());
+      } else {
+        conn = new MockInstance().getConnector(user, password);
+      }
+
+      if (autoCreateSchema)
+        createSchema();
+    } catch (AccumuloException e) {
+      throw new IOException(e);
+    } catch (AccumuloSecurityException e) {
+      throw new IOException(e);
+    }
+  }
+  
+  protected AccumuloMapping readMapping(String filename) throws IOException {
+    try {
+      
+      AccumuloMapping mapping = new AccumuloMapping();
+
+      DocumentBuilder db = DocumentBuilderFactory.newInstance().newDocumentBuilder();
+      Document dom = db.parse(getClass().getClassLoader().getResourceAsStream(filename));
+      
+      Element root = dom.getDocumentElement();
+      
+      NodeList nl = root.getElementsByTagName("class");
+      for (int i = 0; i < nl.getLength(); i++) {
+        
+        Element classElement = (Element) nl.item(i);
+        if (classElement.getAttribute("keyClass").equals(keyClass.getCanonicalName())
+            && classElement.getAttribute("name").equals(persistentClass.getCanonicalName())) {
+
+          mapping.tableName = getSchemaName(classElement.getAttribute("table"), persistentClass);
+          mapping.encoder = classElement.getAttribute("encoder");
+          
+          NodeList fields = classElement.getElementsByTagName("field");
+          for (int j = 0; j < fields.getLength(); j++) {
+            Element fieldElement = (Element) fields.item(j);
+
+            String name = fieldElement.getAttribute("name");
+            String family = fieldElement.getAttribute("family");
+            String qualifier = fieldElement.getAttribute("qualifier");
+            if (qualifier.equals(""))
+              qualifier = null;
+
+            Pair<Text,Text> col = new Pair<Text,Text>(new Text(family), qualifier == null ? null : new Text(qualifier));
+            mapping.fieldMap.put(name, col);
+            mapping.columnMap.put(col, name);
+          }
+        }
+
+      }
+      
+      nl = root.getElementsByTagName("table");
+      for (int i = 0; i < nl.getLength(); i++) {
+        Element tableElement = (Element) nl.item(i);
+        if (tableElement.getAttribute("name").equals(mapping.tableName)) {
+          NodeList configs = tableElement.getElementsByTagName("config");
+          for (int j = 0; j < configs.getLength(); j++) {
+            Element configElement = (Element) configs.item(j);
+            String key = configElement.getAttribute("key");
+            String val = configElement.getAttribute("value");
+            mapping.tableConfig.put(key, val);
+          }
+        }
+      }
+
+      return mapping;
+    } catch (Exception ex) {
+      throw new IOException(ex);
+    }
+
+  }
+  
+  @Override
+  public String getSchemaName() {
+    return mapping.tableName;
+  }
+  
+  @Override
+  public void createSchema() throws IOException {
+    try {
+      conn.tableOperations().create(mapping.tableName);
+      Set<Entry<String,String>> es = mapping.tableConfig.entrySet();
+      for (Entry<String,String> entry : es) {
+        conn.tableOperations().setProperty(mapping.tableName, entry.getKey(), entry.getValue());
+      }
+
+    } catch (AccumuloException e) {
+      throw new IOException(e);
+    } catch (AccumuloSecurityException e) {
+      throw new IOException(e);
+    } catch (TableExistsException e) {
+      return;
+    }
+  }
+
+  @Override
+  public void deleteSchema() throws IOException {
+    try {
+      if (batchWriter != null)
+        batchWriter.close();
+      batchWriter = null;
+      conn.tableOperations().delete(mapping.tableName);
+    } catch (AccumuloException e) {
+      throw new IOException(e);
+    } catch (AccumuloSecurityException e) {
+      throw new IOException(e);
+    } catch (TableNotFoundException e) {
+      return;
+    }
+  }
+
+  @Override
+  public boolean schemaExists() throws IOException {
+    return conn.tableOperations().exists(mapping.tableName);
+  }
+
+  public ByteSequence populate(Iterator<Entry<Key,Value>> iter, T persistent) throws IOException {
+    ByteSequence row = null;
+    
+    Map currentMap = null;
+    ArrayList currentArray = null;
+    Text currentFam = null;
+    int currentPos = 0;
+    Schema currentSchema = null;
+    Field currentField = null;
+
+    while (iter.hasNext()) {
+      Entry<Key,Value> entry = iter.next();
+      
+      if (currentMap != null) {
+        if (currentFam.equals(entry.getKey().getColumnFamily())) {
+          currentMap.put(new Utf8(entry.getKey().getColumnQualifierData().toArray()), fromBytes(currentSchema, entry.getValue().get()));
+          continue;
+        } else {
+          persistent.put(currentPos, currentMap);
+          currentMap = null;
+        }
+      } else if (currentArray != null) {
+        if (currentFam.equals(entry.getKey().getColumnFamily())) {
+          currentArray.add(fromBytes(currentSchema, entry.getValue().get()));
+          continue;
+        } else {
+          persistent.put(currentPos, new ListGenericArray<T>(currentField.schema(), currentArray));
+          currentArray = null;
+        }
+      }
+
+      if (row == null)
+        row = entry.getKey().getRowData();
+      
+      String fieldName = mapping.columnMap.get(new Pair<Text,Text>(entry.getKey().getColumnFamily(), entry.getKey().getColumnQualifier()));
+      if (fieldName == null)
+        fieldName = mapping.columnMap.get(new Pair<Text,Text>(entry.getKey().getColumnFamily(), null));
+
+      Field field = fieldMap.get(fieldName);
+
+      switch (field.schema().getType()) {
+        case MAP:
+          currentMap = new StatefulHashMap();
+          currentPos = field.pos();
+          currentFam = entry.getKey().getColumnFamily();
+          currentSchema = field.schema().getValueType();
+          
+          currentMap.put(new Utf8(entry.getKey().getColumnQualifierData().toArray()), fromBytes(currentSchema, entry.getValue().get()));
+
+          break;
+        case ARRAY:
+          currentArray = new ArrayList();
+          currentPos = field.pos();
+          currentFam = entry.getKey().getColumnFamily();
+          currentSchema = field.schema().getElementType();
+          currentField = field;
+          
+          currentArray.add(fromBytes(currentSchema, entry.getValue().get()));
+
+          break;
+        case RECORD:
+          SpecificDatumReader reader = new SpecificDatumReader(field.schema());
+          byte[] val = entry.getValue().get();
+          // TODO reuse decoder
+          BinaryDecoder decoder = DecoderFactory.defaultFactory().createBinaryDecoder(val, null);
+          persistent.put(field.pos(), reader.read(null, decoder));
+          break;
+        default:
+          persistent.put(field.pos(), fromBytes(field.schema(), entry.getValue().get()));
+      }
+    }
+    
+    if (currentMap != null) {
+      persistent.put(currentPos, currentMap);
+    } else if (currentArray != null) {
+      persistent.put(currentPos, new ListGenericArray<T>(currentField.schema(), currentArray));
+    }
+
+    persistent.clearDirty();
+
+    return row;
+  }
+
+  private void setFetchColumns(Scanner scanner, String fields[]) {
+    fields = getFieldsToQuery(fields);
+    for (String field : fields) {
+      Pair<Text,Text> col = mapping.fieldMap.get(field);
+      if (col.getSecond() == null) {
+        scanner.fetchColumnFamily(col.getFirst());
+      } else {
+        scanner.fetchColumn(col.getFirst(), col.getSecond());
+      }
+    }
+  }
+
+  @Override
+  public T get(K key, String[] fields) throws IOException {
+    try {
+      // TODO make isolated scanner optional?
+      Scanner scanner = new IsolatedScanner(conn.createScanner(mapping.tableName, Constants.NO_AUTHS));
+      Range rowRange = new Range(new Text(toBytes(key)));
+      
+      scanner.setRange(rowRange);
+      setFetchColumns(scanner, fields);
+      
+      T persistent = newPersistent();
+      ByteSequence row = populate(scanner.iterator(), persistent);
+      if (row == null)
+        return null;
+      return persistent;
+    } catch (TableNotFoundException e) {
+      return null;
+    }
+  }
+  
+  @Override
+  public void put(K key, T val) throws IOException {
+
+    Mutation m = new Mutation(new Text(toBytes(key)));
+    
+    Schema schema = val.getSchema();
+    StateManager stateManager = val.getStateManager();
+    
+    Iterator<Field> iter = schema.getFields().iterator();
+    
+    int count = 0;
+    for (int i = 0; iter.hasNext(); i++) {
+      Field field = iter.next();
+      if (!stateManager.isDirty(val, i)) {
+        continue;
+      }
+      
+      Object o = val.get(i);
+      Pair<Text,Text> col = mapping.fieldMap.get(field.name());
+
+      switch (field.schema().getType()) {
+        case MAP:
+          if (o instanceof StatefulMap) {
+            StatefulMap map = (StatefulMap) o;
+            Set<?> es = map.states().entrySet();
+            for (Object entry : es) {
+              Object mapKey = ((Entry) entry).getKey();
+              State state = (State) ((Entry) entry).getValue();
+
+              switch (state) {
+                case NEW:
+                case DIRTY:
+                  m.put(col.getFirst(), new Text(toBytes(mapKey)), new Value(toBytes(map.get(mapKey))));
+                  count++;
+                  break;
+                case DELETED:
+                  m.putDelete(col.getFirst(), new Text(toBytes(mapKey)));
+                  count++;
+                  break;
+              }
+              
+            }
+          } else {
+            Map map = (Map) o;
+            Set<?> es = map.entrySet();
+            for (Object entry : es) {
+              Object mapKey = ((Entry) entry).getKey();
+              Object mapVal = ((Entry) entry).getValue();
+              m.put(col.getFirst(), new Text(toBytes(mapKey)), new Value(toBytes(mapVal)));
+              count++;
+            }
+          }
+          break;
+        case ARRAY:
+          GenericArray array = (GenericArray) o;
+          int j = 0;
+          for (Object item : array) {
+            m.put(col.getFirst(), new Text(toBytes(j++)), new Value(toBytes(item)));
+            count++;
+          }
+          break;
+        case RECORD:
+          SpecificDatumWriter writer = new SpecificDatumWriter(field.schema());
+          ByteArrayOutputStream os = new ByteArrayOutputStream();
+          BinaryEncoder encoder = new BinaryEncoder(os);
+          writer.write(o, encoder);
+          encoder.flush();
+          m.put(col.getFirst(), col.getSecond(), new Value(os.toByteArray()));
+          break;
+        default:
+          m.put(col.getFirst(), col.getSecond(), new Value(toBytes(o)));
+          count++;
+      }
+
+    }
+    
+    if (count > 0)
+      try {
+        getBatchWriter().addMutation(m);
+      } catch (MutationsRejectedException e) {
+        throw new IOException(e);
+      }
+  }
+  
+  @Override
+  public boolean delete(K key) throws IOException {
+    Query<K,T> q = newQuery();
+    q.setKey(key);
+    return deleteByQuery(q) > 0;
+  }
+
+  @Override
+  public long deleteByQuery(Query<K,T> query) throws IOException {
+    try {
+      Scanner scanner = createScanner(query);
+      // add iterator that drops values on the server side
+      scanner.addScanIterator(new IteratorSetting(Integer.MAX_VALUE, SortedKeyIterator.class));
+      RowIterator iterator = new RowIterator(scanner.iterator());
+      
+      long count = 0;
+
+      while (iterator.hasNext()) {
+        Iterator<Entry<Key,Value>> row = iterator.next();
+        Mutation m = null;
+        while (row.hasNext()) {
+          Entry<Key,Value> entry = row.next();
+          Key key = entry.getKey();
+          if (m == null)
+            m = new Mutation(key.getRow());
+          // TODO optimize to avoid continually creating column vis? prob does not matter for empty
+          m.putDelete(key.getColumnFamily(), key.getColumnQualifier(), new ColumnVisibility(key.getColumnVisibility()), key.getTimestamp());
+        }
+        getBatchWriter().addMutation(m);
+        count++;
+      }
+      
+      return count;
+    } catch (TableNotFoundException e) {
+      // TODO return 0?
+      throw new IOException(e);
+    } catch (MutationsRejectedException e) {
+      throw new IOException(e);
+    }
+  }
+
+  private Range createRange(Query<K,T> query) {
+    Text startRow = null;
+    Text endRow = null;
+    
+    if (query.getStartKey() != null)
+      startRow = new Text(toBytes(query.getStartKey()));
+    
+    if (query.getEndKey() != null)
+      endRow = new Text(toBytes(query.getEndKey()));
+    
+    return new Range(startRow, true, endRow, true);
+    
+  }
+  
+  private Scanner createScanner(Query<K,T> query) throws TableNotFoundException {
+    // TODO make isolated scanner optional?
+    Scanner scanner = new IsolatedScanner(conn.createScanner(mapping.tableName, Constants.NO_AUTHS));
+    setFetchColumns(scanner, query.getFields());
+    
+    scanner.setRange(createRange(query));
+    
+    if (query.getStartTime() != -1 || query.getEndTime() != -1) {
+      IteratorSetting is = new IteratorSetting(30, TimestampFilter.class);
+      if (query.getStartTime() != -1)
+        TimestampFilter.setStart(is, query.getStartTime(), true);
+      if (query.getEndTime() != -1)
+        TimestampFilter.setEnd(is, query.getEndTime(), true);
+      
+      scanner.addScanIterator(is);
+    }
+    
+    return scanner;
+  }
+
+  @Override
+  public Result<K,T> execute(Query<K,T> query) throws IOException {
+    try {
+      Scanner scanner = createScanner(query);
+      return new AccumuloResult<K,T>(this, query, scanner);
+    } catch (TableNotFoundException e) {
+      // TODO return empty result?
+      throw new IOException(e);
+    }
+  }
+  
+  @Override
+  public Query<K,T> newQuery() {
+    return new AccumuloQuery<K,T>(this);
+  }
+
+  Text pad(Text key, int bytes) {
+    if (key.getLength() < bytes)
+      key = new Text(key);
+    
+    while (key.getLength() < bytes) {
+      key.append(new byte[] {0}, 0, 1);
+    }
+    
+    return key;
+  }
+  
+  @Override
+  public List<PartitionQuery<K,T>> getPartitions(Query<K,T> query) throws IOException {
+    try {
+      TabletLocator tl;
+      if (conn instanceof MockConnector)
+        tl = new MockTabletLocator();
+      else
+        tl = TabletLocator.getInstance(conn.getInstance(), authInfo, new Text(Tables.getTableId(conn.getInstance(), mapping.tableName)));
+      
+      Map<String,Map<KeyExtent,List<Range>>> binnedRanges = new HashMap<String,Map<KeyExtent,List<Range>>>();
+      
+      while (tl.binRanges(Collections.singletonList(createRange(query)), binnedRanges).size() > 0) {
+        // TODO log?
+        if (!Tables.exists(conn.getInstance(), Tables.getTableId(conn.getInstance(), mapping.tableName)))
+          throw new TableDeletedException(Tables.getTableId(conn.getInstance(), mapping.tableName));
+        else if (Tables.getTableState(conn.getInstance(), Tables.getTableId(conn.getInstance(), mapping.tableName)) == TableState.OFFLINE)
+          throw new TableOfflineException(conn.getInstance(), Tables.getTableId(conn.getInstance(), mapping.tableName));
+        UtilWaitThread.sleep(100);
+      }
+      
+      List<PartitionQuery<K,T>> ret = new ArrayList<PartitionQuery<K,T>>();
+      
+      Text startRow = null;
+      Text endRow = null;
+      if (query.getStartKey() != null)
+        startRow = new Text(toBytes(query.getStartKey()));
+      if (query.getEndKey() != null)
+        endRow = new Text(toBytes(query.getEndKey()));
+     
+      //hadoop expects hostnames, accumulo keeps track of IPs... so need to convert
+      HashMap<String,String> hostNameCache = new HashMap<String,String>();
+ 
+      for (Entry<String,Map<KeyExtent,List<Range>>> entry : binnedRanges.entrySet()) {
+        String ip = entry.getKey().split(":", 2)[0];
+        String location = hostNameCache.get(ip);
+        if (location == null) {
+          InetAddress inetAddress = InetAddress.getByName(ip);
+          location = inetAddress.getHostName();
+          hostNameCache.put(ip, location);
+        }
+
+        Map<KeyExtent,List<Range>> tablets = entry.getValue();
+        for (KeyExtent ke : tablets.keySet()) {
+          
+          K startKey = null;
+          if (startRow == null || !ke.contains(startRow)) {
+            if (ke.getPrevEndRow() != null) {
+              startKey = followingKey(encoder, getKeyClass(), TextUtil.getBytes(ke.getPrevEndRow()));
+            }
+          } else {
+            startKey = fromBytes(getKeyClass(), TextUtil.getBytes(startRow));
+          }
+          
+          K endKey = null;
+          if (endRow == null || !ke.contains(endRow)) {
+            if (ke.getEndRow() != null)
+              endKey = lastPossibleKey(encoder, getKeyClass(), TextUtil.getBytes(ke.getEndRow()));
+          } else {
+            endKey = fromBytes(getKeyClass(), TextUtil.getBytes(endRow));
+          }
+          
+          PartitionQueryImpl pqi = new PartitionQueryImpl<K,T>(query, startKey, endKey, new String[] {location});
+          ret.add(pqi);
+        }
+      }
+      
+      return ret;
+    } catch (TableNotFoundException e) {
+      throw new IOException(e);
+    } catch (AccumuloException e) {
+      throw new IOException(e);
+    } catch (AccumuloSecurityException e) {
+      throw new IOException(e);
+    }
+    
+  }
+  
+  static <K> K lastPossibleKey(Encoder encoder, Class<K> clazz, byte[] er) {
+    
+    if (clazz.equals(Byte.TYPE) || clazz.equals(Byte.class)) {
+      throw new UnsupportedOperationException();
+    } else if (clazz.equals(Boolean.TYPE) || clazz.equals(Boolean.class)) {
+      throw new UnsupportedOperationException();
+    } else if (clazz.equals(Short.TYPE) || clazz.equals(Short.class)) {
+      return fromBytes(encoder, clazz, encoder.lastPossibleKey(2, er));
+    } else if (clazz.equals(Integer.TYPE) || clazz.equals(Integer.class)) {
+      return fromBytes(encoder, clazz, encoder.lastPossibleKey(4, er));
+    } else if (clazz.equals(Long.TYPE) || clazz.equals(Long.class)) {
+      return fromBytes(encoder, clazz, encoder.lastPossibleKey(8, er));
+    } else if (clazz.equals(Float.TYPE) || clazz.equals(Float.class)) {
+      return fromBytes(encoder, clazz, encoder.lastPossibleKey(4, er));
+    } else if (clazz.equals(Double.TYPE) || clazz.equals(Double.class)) {
+      return fromBytes(encoder, clazz, encoder.lastPossibleKey(8, er));
+    } else if (clazz.equals(String.class)) {
+      throw new UnsupportedOperationException();
+    } else if (clazz.equals(Utf8.class)) {
+      return fromBytes(encoder, clazz, er);
+    }
+    
+    throw new IllegalArgumentException("Unknown type " + clazz.getName());
+  }
+
+
+  
+  /**
+   * @param keyClass
+   * @param bytes
+   * @return
+   */
+  static <K> K followingKey(Encoder encoder, Class<K> clazz, byte[] per) {
+    
+    if (clazz.equals(Byte.TYPE) || clazz.equals(Byte.class)) {
+      return (K) Byte.valueOf(encoder.followingKey(1, per)[0]);
+    } else if (clazz.equals(Boolean.TYPE) || clazz.equals(Boolean.class)) {
+      throw new UnsupportedOperationException();
+    } else if (clazz.equals(Short.TYPE) || clazz.equals(Short.class)) {
+      return fromBytes(encoder, clazz, encoder.followingKey(2, per));
+    } else if (clazz.equals(Integer.TYPE) || clazz.equals(Integer.class)) {
+      return fromBytes(encoder, clazz, encoder.followingKey(4, per));
+    } else if (clazz.equals(Long.TYPE) || clazz.equals(Long.class)) {
+      return fromBytes(encoder, clazz, encoder.followingKey(8, per));
+    } else if (clazz.equals(Float.TYPE) || clazz.equals(Float.class)) {
+      return fromBytes(encoder, clazz, encoder.followingKey(4, per));
+    } else if (clazz.equals(Double.TYPE) || clazz.equals(Double.class)) {
+      return fromBytes(encoder, clazz, encoder.followingKey(8, per));
+    } else if (clazz.equals(String.class)) {
+      throw new UnsupportedOperationException();
+    } else if (clazz.equals(Utf8.class)) {
+      return fromBytes(encoder, clazz, Arrays.copyOf(per, per.length + 1));
+    }
+
+    throw new IllegalArgumentException("Unknown type " + clazz.getName());
+  }
+
+  @Override
+  public void flush() throws IOException {
+    try {
+      if (batchWriter != null) {
+        batchWriter.flush();
+      }
+    } catch (MutationsRejectedException e) {
+      throw new IOException(e);
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    try {
+      if (batchWriter != null) {
+        batchWriter.close();
+        batchWriter = null;
+      }
+    } catch (MutationsRejectedException e) {
+      throw new IOException(e);
+    }
+    
+  }
+}

Propchange: gora/trunk/gora-accumulo/src/main/java/org/apache/gora/accumulo/store/AccumuloStore.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: gora/trunk/gora-accumulo/src/main/java/org/apache/gora/accumulo/util/FixedByteArrayOutputStream.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-accumulo/src/main/java/org/apache/gora/accumulo/util/FixedByteArrayOutputStream.java?rev=1308477&view=auto
==============================================================================
--- gora/trunk/gora-accumulo/src/main/java/org/apache/gora/accumulo/util/FixedByteArrayOutputStream.java (added)
+++ gora/trunk/gora-accumulo/src/main/java/org/apache/gora/accumulo/util/FixedByteArrayOutputStream.java Mon Apr  2 19:15:38 2012
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gora.accumulo.util;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+public class FixedByteArrayOutputStream extends OutputStream {
+  
+  private int i;
+  byte out[];
+  
+  public FixedByteArrayOutputStream(byte out[]) {
+    this.out = out;
+  }
+  
+  @Override
+  public void write(int b) throws IOException {
+    out[i++] = (byte) b;
+  }
+  
+  @Override
+  public void write(byte b[], int off, int len) throws IOException {
+    System.arraycopy(b, off, out, i, len);
+    i += len;
+  }
+  
+}
\ No newline at end of file

Propchange: gora/trunk/gora-accumulo/src/main/java/org/apache/gora/accumulo/util/FixedByteArrayOutputStream.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: gora/trunk/gora-accumulo/src/test/java/org/apache/gora/accumulo/store/AccumuloStoreTest.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-accumulo/src/test/java/org/apache/gora/accumulo/store/AccumuloStoreTest.java?rev=1308477&view=auto
==============================================================================
--- gora/trunk/gora-accumulo/src/test/java/org/apache/gora/accumulo/store/AccumuloStoreTest.java (added)
+++ gora/trunk/gora-accumulo/src/test/java/org/apache/gora/accumulo/store/AccumuloStoreTest.java Mon Apr  2 19:15:38 2012
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gora.accumulo.store;
+
+import java.io.IOException;
+
+import org.apache.gora.examples.generated.Employee;
+import org.apache.gora.examples.generated.WebPage;
+import org.apache.gora.store.DataStore;
+import org.apache.gora.store.DataStoreFactory;
+import org.apache.gora.store.DataStoreTestBase;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * 
+ */
+public class AccumuloStoreTest extends DataStoreTestBase {
+  
+  // TODO implement test driver
+
+  @Override
+  protected DataStore<String,Employee> createEmployeeDataStore() throws IOException {
+    return DataStoreFactory.getDataStore(String.class, Employee.class, new Configuration());
+  }
+  
+  @Override
+  protected DataStore<String,WebPage> createWebPageDataStore() throws IOException {
+    return DataStoreFactory.getDataStore(String.class, WebPage.class, new Configuration());
+  }
+  
+}

Propchange: gora/trunk/gora-accumulo/src/test/java/org/apache/gora/accumulo/store/AccumuloStoreTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: gora/trunk/gora-accumulo/src/test/java/org/apache/gora/accumulo/store/PartitionTest.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-accumulo/src/test/java/org/apache/gora/accumulo/store/PartitionTest.java?rev=1308477&view=auto
==============================================================================
--- gora/trunk/gora-accumulo/src/test/java/org/apache/gora/accumulo/store/PartitionTest.java (added)
+++ gora/trunk/gora-accumulo/src/test/java/org/apache/gora/accumulo/store/PartitionTest.java Mon Apr  2 19:15:38 2012
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gora.accumulo.store;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import junit.framework.Assert;
+
+import org.apache.gora.accumulo.encoders.Encoder;
+import org.apache.gora.accumulo.encoders.SignedBinaryEncoder;
+import org.junit.Test;
+
+/**
+ * 
+ */
+public class PartitionTest {
+  // TODO test more types
+
+  private static Encoder encoder = new SignedBinaryEncoder();
+
+  static long encl(long l) {
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    DataOutputStream dos = new DataOutputStream(baos);
+    try {
+      dos.writeLong(l);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    return encoder.decodeLong(baos.toByteArray());
+  }
+
+  @Test
+  public void test1() {
+    Assert.assertEquals(encl(0x006f000000000000l), (long) AccumuloStore.followingKey(encoder, Long.class, new byte[] {0x00, 0x6f}));
+    Assert.assertEquals(encl(1l), (long) AccumuloStore.followingKey(encoder, Long.class, new byte[] {0, 0, 0, 0, 0, 0, 0, 0}));
+    Assert.assertEquals(encl(0x106f000000000001l), (long) AccumuloStore.followingKey(encoder, Long.class, new byte[] {0x10, 0x6f, 0, 0, 0, 0, 0, 0}));
+    Assert.assertEquals(
+        encl(-1l),
+        (long) AccumuloStore.followingKey(encoder, Long.class, new byte[] {(byte) 0xff, (byte) 0xff, (byte) 0xff, (byte) 0xff, (byte) 0xff, (byte) 0xff,
+            (byte) 0xff,
+            (byte) 0xfe}));
+    
+    Assert.assertEquals(encl(0x8000000000000001l), (long) AccumuloStore.followingKey(encoder, Long.class, new byte[] {(byte) 0x80, 0, 0, 0, 0, 0, 0, 0}));
+    Assert.assertEquals(
+        encl(0x8000000000000000l),
+        (long) AccumuloStore.followingKey(encoder, Long.class, new byte[] {(byte) 0x7f, (byte) 0xff, (byte) 0xff, (byte) 0xff, (byte) 0xff, (byte) 0xff,
+            (byte) 0xff,
+            (byte) 0xff}));
+
+
+    try {
+      AccumuloStore.followingKey(encoder, Long.class,
+          new byte[] {(byte) 0xff, (byte) 0xff, (byte) 0xff, (byte) 0xff, (byte) 0xff, (byte) 0xff, (byte) 0xff, (byte) 0xff});
+      Assert.assertTrue(false);
+    } catch (IllegalArgumentException iea) {
+      
+    }
+  }
+  
+  @Test
+  public void test2() {
+    Assert.assertEquals(encl(0x00ffffffffffffffl), (long) AccumuloStore.lastPossibleKey(encoder, Long.class, new byte[] {0x01}));
+    Assert.assertEquals(encl(0x006effffffffffffl), (long) AccumuloStore.lastPossibleKey(encoder, Long.class, new byte[] {0x00, 0x6f}));
+    Assert.assertEquals(encl(0xff6effffffffffffl), (long) AccumuloStore.lastPossibleKey(encoder, Long.class, new byte[] {(byte) 0xff, 0x6f}));
+    Assert.assertEquals(encl(0xfffeffffffffffffl), (long) AccumuloStore.lastPossibleKey(encoder, Long.class, new byte[] {(byte) 0xff, (byte) 0xff}));
+    Assert.assertEquals(encl(0l), (long) AccumuloStore.lastPossibleKey(encoder, Long.class, new byte[] {(byte) 0, 0, 0, 0, 0, 0, 0, 0}));
+    
+    Assert.assertEquals(encl(0x7effffffffffffffl), (long) AccumuloStore.lastPossibleKey(encoder, Long.class, new byte[] {(byte) 0x7f}));
+    Assert.assertEquals(encl(0x7fffffffffffffffl), (long) AccumuloStore.lastPossibleKey(encoder, Long.class, new byte[] {(byte) 0x80}));
+    Assert.assertEquals(encl(0x80ffffffffffffffl), (long) AccumuloStore.lastPossibleKey(encoder, Long.class, new byte[] {(byte) 0x81}));
+
+    try {
+      AccumuloStore.lastPossibleKey(encoder, Long.class, new byte[] {(byte) 0, 0, 0, 0, 0, 0, 0});
+      Assert.assertTrue(false);
+    } catch (IllegalArgumentException iea) {
+      
+    }
+  }
+}

Propchange: gora/trunk/gora-accumulo/src/test/java/org/apache/gora/accumulo/store/PartitionTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: gora/trunk/gora-accumulo/src/test/java/org/apache/gora/accumulo/util/HexEncoderTest.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-accumulo/src/test/java/org/apache/gora/accumulo/util/HexEncoderTest.java?rev=1308477&view=auto
==============================================================================
--- gora/trunk/gora-accumulo/src/test/java/org/apache/gora/accumulo/util/HexEncoderTest.java (added)
+++ gora/trunk/gora-accumulo/src/test/java/org/apache/gora/accumulo/util/HexEncoderTest.java Mon Apr  2 19:15:38 2012
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gora.accumulo.util;
+
+import org.apache.gora.accumulo.encoders.HexEncoder;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * 
+ */
+public class HexEncoderTest {
+  
+  @Test
+  public void testByte() {
+    HexEncoder encoder = new HexEncoder();
+    
+    Assert.assertEquals("12", new String(encoder.encodeByte((byte) 0x12)));
+    Assert.assertEquals("f2", new String(encoder.encodeByte((byte) 0xf2)));
+    
+    byte b = Byte.MIN_VALUE;
+    while (b != Byte.MAX_VALUE) {
+      Assert.assertEquals(b, encoder.decodeByte(encoder.encodeByte(b)));
+      b++;
+    }
+  }
+
+  @Test
+  public void testShort() {
+    HexEncoder encoder = new HexEncoder();
+    
+    Assert.assertEquals("1234", new String(encoder.encodeShort((short) 0x1234)));
+    Assert.assertEquals("f234", new String(encoder.encodeShort((short) 0xf234)));
+    
+    short s = Short.MIN_VALUE;
+    while (s != Short.MAX_VALUE) {
+      Assert.assertEquals(s, encoder.decodeShort(encoder.encodeShort(s)));
+      s++;
+    }
+  }
+}

Propchange: gora/trunk/gora-accumulo/src/test/java/org/apache/gora/accumulo/util/HexEncoderTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: gora/trunk/gora-accumulo/src/test/java/org/apache/gora/accumulo/util/SignedBinaryEncoderTest.java
URL: http://svn.apache.org/viewvc/gora/trunk/gora-accumulo/src/test/java/org/apache/gora/accumulo/util/SignedBinaryEncoderTest.java?rev=1308477&view=auto
==============================================================================
--- gora/trunk/gora-accumulo/src/test/java/org/apache/gora/accumulo/util/SignedBinaryEncoderTest.java (added)
+++ gora/trunk/gora-accumulo/src/test/java/org/apache/gora/accumulo/util/SignedBinaryEncoderTest.java Mon Apr  2 19:15:38 2012
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gora.accumulo.util;
+
+import java.util.ArrayList;
+import java.util.Collections;
+
+import junit.framework.Assert;
+
+import org.apache.gora.accumulo.encoders.SignedBinaryEncoder;
+import org.apache.hadoop.io.Text;
+import org.junit.Test;
+
+/**
+ * 
+ */
+public class SignedBinaryEncoderTest {
+  @Test
+  public void testShort() {
+    short s = Short.MIN_VALUE;
+    Text prev = null;
+    
+    SignedBinaryEncoder encoder = new SignedBinaryEncoder();
+
+    while (true) {
+      byte[] enc = encoder.encodeShort(s);
+      Assert.assertEquals(s, encoder.decodeShort(enc));
+      Text current = new Text(enc);
+      if (prev != null)
+        Assert.assertTrue(prev.compareTo(current) < 0);
+      prev = current;
+      s++;
+      if (s == Short.MAX_VALUE)
+        break;
+    }
+  }
+
+  private void testInt(int start, int finish) {
+    int i = start;
+    Text prev = null;
+    
+    SignedBinaryEncoder encoder = new SignedBinaryEncoder();
+
+    while (true) {
+      byte[] enc = encoder.encodeInt(i);
+      Assert.assertEquals(i, encoder.decodeInt(enc));
+      Text current = new Text(enc);
+      if (prev != null)
+        Assert.assertTrue(prev.compareTo(current) < 0);
+      prev = current;
+      i++;
+      if (i == finish)
+        break;
+    }
+  }
+  
+  @Test
+  public void testInt() {
+    testInt(Integer.MIN_VALUE, Integer.MIN_VALUE + (1 << 16));
+    testInt(-(1 << 15), (1 << 15));
+    testInt(Integer.MAX_VALUE - (1 << 16), Integer.MAX_VALUE);
+  }
+  
+  private void testLong(long start, long finish) {
+    long l = start;
+    Text prev = null;
+    
+    SignedBinaryEncoder encoder = new SignedBinaryEncoder();
+
+    while (true) {
+      byte[] enc = encoder.encodeLong(l);
+      Assert.assertEquals(l, encoder.decodeLong(enc));
+      Text current = new Text(enc);
+      if (prev != null)
+        Assert.assertTrue(prev.compareTo(current) < 0);
+      prev = current;
+      l++;
+      if (l == finish)
+        break;
+    }
+  }
+  
+  @Test
+  public void testLong() {
+    testLong(Long.MIN_VALUE, Long.MIN_VALUE + (1 << 16));
+    testLong(-(1 << 15), (1 << 15));
+    testLong(Long.MAX_VALUE - (1 << 16), Long.MAX_VALUE);
+  }
+  
+  @Test
+  public void testDouble() {
+    
+    ArrayList<Double> testData = new ArrayList<Double>();
+    testData.add(Double.NEGATIVE_INFINITY);
+    testData.add(Double.MIN_VALUE);
+    testData.add(Math.nextUp(Double.NEGATIVE_INFINITY));
+    testData.add(Math.pow(10.0, 30.0) * -1.0);
+    testData.add(Math.pow(10.0, 30.0));
+    testData.add(Math.pow(10.0, -30.0) * -1.0);
+    testData.add(Math.pow(10.0, -30.0));
+    testData.add(Math.nextAfter(0.0, Double.NEGATIVE_INFINITY));
+    testData.add(0.0);
+    testData.add(Math.nextAfter(Double.MAX_VALUE, Double.NEGATIVE_INFINITY));
+    testData.add(Double.MAX_VALUE);
+    testData.add(Double.POSITIVE_INFINITY);
+    
+    Collections.sort(testData);
+    
+    SignedBinaryEncoder encoder = new SignedBinaryEncoder();
+
+    for (int i = 0; i < testData.size(); i++) {
+      byte[] enc = encoder.encodeDouble(testData.get(i));
+      Assert.assertEquals(testData.get(i), encoder.decodeDouble(enc));
+      if (i > 1) {
+        Assert.assertTrue("Checking " + testData.get(i) + " > " + testData.get(i - 1),
+            new Text(enc).compareTo(new Text(encoder.encodeDouble(testData.get(i - 1)))) > 0);
+      }
+    }
+  }
+
+  @Test
+  public void testFloat() {
+    
+    ArrayList<Float> testData = new ArrayList<Float>();
+    testData.add(Float.NEGATIVE_INFINITY);
+    testData.add(Float.MIN_VALUE);
+    testData.add(Math.nextUp(Float.NEGATIVE_INFINITY));
+    testData.add((float) Math.pow(10.0f, 30.0f) * -1.0f);
+    testData.add((float) Math.pow(10.0f, 30.0f));
+    testData.add((float) Math.pow(10.0f, -30.0f) * -1.0f);
+    testData.add((float) Math.pow(10.0f, -30.0f));
+    testData.add(Math.nextAfter(0.0f, Float.NEGATIVE_INFINITY));
+    testData.add(0.0f);
+    testData.add(Math.nextAfter(Float.MAX_VALUE, Float.NEGATIVE_INFINITY));
+    testData.add(Float.MAX_VALUE);
+    testData.add(Float.POSITIVE_INFINITY);
+    
+    Collections.sort(testData);
+    
+    SignedBinaryEncoder encoder = new SignedBinaryEncoder();
+
+    for (int i = 0; i < testData.size(); i++) {
+      byte[] enc = encoder.encodeFloat(testData.get(i));
+      Assert.assertEquals(testData.get(i), encoder.decodeFloat(enc));
+      if (i > 1) {
+        Assert.assertTrue("Checking " + testData.get(i) + " > " + testData.get(i - 1),
+            new Text(enc).compareTo(new Text(encoder.encodeFloat(testData.get(i - 1)))) > 0);
+      }
+    }
+  }
+
+}

Propchange: gora/trunk/gora-accumulo/src/test/java/org/apache/gora/accumulo/util/SignedBinaryEncoderTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: gora/trunk/gora-accumulo/src/test/resources/gora-accumulo-mapping.xml
URL: http://svn.apache.org/viewvc/gora/trunk/gora-accumulo/src/test/resources/gora-accumulo-mapping.xml?rev=1308477&view=auto
==============================================================================
--- gora/trunk/gora-accumulo/src/test/resources/gora-accumulo-mapping.xml (added)
+++ gora/trunk/gora-accumulo/src/test/resources/gora-accumulo-mapping.xml Mon Apr  2 19:15:38 2012
@@ -0,0 +1,35 @@
+<gora-orm>
+  <table name="AccessLog">
+    <config key="table.file.compress.blocksize" value="32K"/>
+  </table>
+
+  <class name="org.apache.gora.tutorial.log.generated.Pageview" keyClass="java.lang.Long" table="AccessLog">
+    <field name="url" family="common" qualifier="url"/>
+    <field name="timestamp" family="common" qualifier="timestamp"/>
+    <field name="ip" family="common" qualifier="ip" />
+    <field name="httpMethod" family="http" qualifier="httpMethod"/>
+    <field name="httpStatusCode" family="http" qualifier="httpStatusCode"/>
+    <field name="responseSize" family="http" qualifier="responseSize"/>
+    <field name="referrer" family="misc" qualifier="referrer"/>
+    <field name="userAgent" family="misc" qualifier="userAgent"/>
+  </class>
+  
+  <class name="org.apache.gora.examples.generated.Employee" keyClass="java.lang.String" table="Employee">
+    <field name="name" family="info" qualifier="nm"/>
+    <field name="dateOfBirth" family="info" qualifier="db"/>
+    <field name="ssn" family="info" qualifier="sn"/>
+    <field name="salary" family="info" qualifier="sl"/>
+  </class>
+  
+  <class name="org.apache.gora.examples.generated.WebPage" keyClass="java.lang.String" table="WebPage">
+    <field name="url" family="common" qualifier="u"/>
+    <field name="content" family="content" qualifier="c"/>
+    <field name="parsedContent" family="parsedContent"/>
+    <field name="outlinks" family="outlinks"/>
+    <field name="metadata" family="common" qualifier="metadata"/>
+  </class>
+
+  <class name="org.apache.gora.examples.generated.TokenDatum" keyClass="java.lang.String">
+    <field name="count" family="common" qualifier="count"/>
+  </class>  
+</gora-orm>  
\ No newline at end of file

Propchange: gora/trunk/gora-accumulo/src/test/resources/gora-accumulo-mapping.xml
------------------------------------------------------------------------------
    svn:eol-style = native

Added: gora/trunk/gora-accumulo/src/test/resources/gora.properties
URL: http://svn.apache.org/viewvc/gora/trunk/gora-accumulo/src/test/resources/gora.properties?rev=1308477&view=auto
==============================================================================
--- gora/trunk/gora-accumulo/src/test/resources/gora.properties (added)
+++ gora/trunk/gora-accumulo/src/test/resources/gora.properties Mon Apr  2 19:15:38 2012
@@ -0,0 +1,6 @@
+gora.datastore.default=org.apache.gora.accumulo.store.AccumuloStore
+gora.datastore.accumulo.mock=true
+gora.datastore.accumulo.instance=a14
+gora.datastore.accumulo.zookeepers=localhost
+gora.datastore.accumulo.user=root
+gora.datastore.accumulo.password=secret
\ No newline at end of file

Propchange: gora/trunk/gora-accumulo/src/test/resources/gora.properties
------------------------------------------------------------------------------
    svn:eol-style = native



Mime
View raw message