Return-Path: X-Original-To: apmail-gora-commits-archive@www.apache.org Delivered-To: apmail-gora-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id ECBBD10903 for ; Sat, 12 Apr 2014 19:22:41 +0000 (UTC) Received: (qmail 44265 invoked by uid 500); 12 Apr 2014 19:22:41 -0000 Delivered-To: apmail-gora-commits-archive@gora.apache.org Received: (qmail 44195 invoked by uid 500); 12 Apr 2014 19:22:39 -0000 Mailing-List: contact commits-help@gora.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@gora.apache.org Delivered-To: mailing list commits@gora.apache.org Received: (qmail 44089 invoked by uid 99); 12 Apr 2014 19:22:37 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 12 Apr 2014 19:22:37 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 12 Apr 2014 19:22:26 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id C4EE12388A67; Sat, 12 Apr 2014 19:22:01 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1586888 [2/10] - in /gora/trunk: ./ bin/ gora-accumulo/ gora-accumulo/src/main/java/org/apache/gora/accumulo/encoders/ gora-accumulo/src/main/java/org/apache/gora/accumulo/query/ gora-accumulo/src/main/java/org/apache/gora/accumulo/store/ ... Date: Sat, 12 Apr 2014 19:21:56 -0000 To: commits@gora.apache.org From: rmarroquin@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20140412192201.C4EE12388A67@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Added: gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/CharSequenceSerializer.java URL: http://svn.apache.org/viewvc/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/CharSequenceSerializer.java?rev=1586888&view=auto ============================================================================== --- gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/CharSequenceSerializer.java (added) +++ gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/CharSequenceSerializer.java Sat Apr 12 19:21:53 2014 @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gora.cassandra.serializers; + +import static me.prettyprint.hector.api.ddl.ComparatorType.UTF8TYPE; + +import java.nio.ByteBuffer; + +import org.apache.avro.util.Utf8; + +import me.prettyprint.cassandra.serializers.AbstractSerializer; +import me.prettyprint.cassandra.serializers.StringSerializer; +import me.prettyprint.hector.api.ddl.ComparatorType; + +/** + * A CharSequenceSerializer translates the byte[] to and from CharSequenceSerializer object of Avro. + */ +public final class CharSequenceSerializer extends AbstractSerializer { + + private static final CharSequenceSerializer instance = new CharSequenceSerializer(); + + public static CharSequenceSerializer get() { + return instance; + } + + @Override + public ByteBuffer toByteBuffer(CharSequence obj) { + if (obj == null) { + return null; + } + return StringSerializer.get().toByteBuffer(obj.toString()); + } + + @Override + //TODO: CharSequence cause Test Fail. All tests set UTF8. When change test set type. This will be CharSequence + public Utf8 fromByteBuffer(ByteBuffer byteBuffer) { + if (byteBuffer == null) { + return null; + } + return new Utf8(StringSerializer.get().fromByteBuffer(byteBuffer)); + } + + @Override + public ComparatorType getComparatorType() { + return UTF8TYPE; + } + +} Propchange: gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/CharSequenceSerializer.java ------------------------------------------------------------------------------ svn:mime-type = text/plain Modified: gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/GenericArraySerializer.java URL: http://svn.apache.org/viewvc/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/GenericArraySerializer.java?rev=1586888&r1=1586887&r2=1586888&view=diff ============================================================================== --- gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/GenericArraySerializer.java (original) +++ gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/GenericArraySerializer.java Sat Apr 12 19:21:53 2014 @@ -1,199 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gora.cassandra.serializers; - -import java.nio.BufferUnderflowException; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import me.prettyprint.cassandra.serializers.AbstractSerializer; -import me.prettyprint.cassandra.serializers.BytesArraySerializer; -import me.prettyprint.cassandra.serializers.IntegerSerializer; -import me.prettyprint.hector.api.Serializer; -import me.prettyprint.hector.api.ddl.ComparatorType; -import static me.prettyprint.hector.api.ddl.ComparatorType.UTF8TYPE; - -import org.apache.avro.Schema; -import org.apache.avro.Schema.Type; -import org.apache.avro.generic.GenericArray; -import org.apache.avro.specific.SpecificFixed; -import org.apache.avro.util.Utf8; -import org.apache.gora.persistency.ListGenericArray; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * A GenericArraySerializer translates the byte[] to and from GenericArray of Avro. - */ -public class GenericArraySerializer extends AbstractSerializer> { - - public static final Logger LOG = LoggerFactory.getLogger(GenericArraySerializer.class); - - private static Map elementTypeToSerializerMap = new HashMap(); - private static Map fixedClassToSerializerMap = new HashMap(); - - public static GenericArraySerializer get(Type elementType) { - GenericArraySerializer serializer = elementTypeToSerializerMap.get(elementType); - if (serializer == null) { - serializer = new GenericArraySerializer(elementType); - elementTypeToSerializerMap.put(elementType, serializer); - } - return serializer; - } - - public static GenericArraySerializer get(Type elementType, Class clazz) { - if (elementType != Type.FIXED) { - return null; - } - GenericArraySerializer serializer = elementTypeToSerializerMap.get(clazz); - if (serializer == null) { - serializer = new GenericArraySerializer(clazz); - fixedClassToSerializerMap.put(clazz, serializer); - } - return serializer; - } - - public static GenericArraySerializer get(Schema elementSchema) { - Type type = elementSchema.getType(); - if (type == Type.FIXED) { - return get(Type.FIXED, TypeUtils.getClass(elementSchema)); - } else { - return get(type); - } - } - - private Schema elementSchema = null; - private Type elementType = null; - private int size = -1; - private Class clazz = null; - private Serializer elementSerializer = null; - - public GenericArraySerializer(Serializer elementSerializer) { - this.elementSerializer = elementSerializer; - } - - public GenericArraySerializer(Schema elementSchema) { - this.elementSchema = elementSchema; - elementType = elementSchema.getType(); - size = TypeUtils.getFixedSize(elementSchema); - elementSerializer = GoraSerializerTypeInferer.getSerializer(elementSchema); - } - - public GenericArraySerializer(Type elementType) { - this.elementType = elementType; - if (elementType != Type.FIXED) { - elementSchema = Schema.create(elementType); - } - clazz = TypeUtils.getClass(elementType); - size = TypeUtils.getFixedSize(elementType); - elementSerializer = GoraSerializerTypeInferer.getSerializer(elementType); - } - - public GenericArraySerializer(Class clazz) { - this.clazz = clazz; - elementType = TypeUtils.getType(clazz); - size = TypeUtils.getFixedSize(clazz); - if (elementType == null || elementType == Type.FIXED) { - elementType = Type.FIXED; - elementSchema = TypeUtils.getSchema(clazz); - elementSerializer = GoraSerializerTypeInferer.getSerializer(elementType, clazz); - } else { - elementSerializer = GoraSerializerTypeInferer.getSerializer(elementType); - } - } - - @Override - public ByteBuffer toByteBuffer(GenericArray array) { - if (array == null) { - return null; - } - if (size > 0) { - return toByteBufferWithFixedLengthElements(array); - } else { - return toByteBufferWithVariableLengthElements(array); - } - } - - private ByteBuffer toByteBufferWithFixedLengthElements(GenericArray array) { - ByteBuffer byteBuffer = ByteBuffer.allocate((int) array.size() * size); - for (T element : array) { - byteBuffer.put(elementSerializer.toByteBuffer(element)); - } - byteBuffer.rewind(); - return byteBuffer; - } - - private ByteBuffer toByteBufferWithVariableLengthElements(GenericArray array) { - int n = (int) array.size(); - List list = new ArrayList(n); - n *= 4; - for (T element : array) { - byte[] bytes = BytesArraySerializer.get().fromByteBuffer(elementSerializer.toByteBuffer(element)); - list.add(bytes); - n += bytes.length; - } - ByteBuffer byteBuffer = ByteBuffer.allocate(n); - for (byte[] bytes : list) { - byteBuffer.put(IntegerSerializer.get().toByteBuffer(bytes.length)); - byteBuffer.put(BytesArraySerializer.get().toByteBuffer(bytes)); - } - byteBuffer.rewind(); - return byteBuffer; - } - - @Override - public GenericArray fromByteBuffer(ByteBuffer byteBuffer) { - if (byteBuffer == null) { - return null; - } - GenericArray array = new ListGenericArray(elementSchema); -int i = 0; - while (true) { - T element = null; - try { - if (size > 0) { - element = elementSerializer.fromByteBuffer(byteBuffer); - } - else { - int n = IntegerSerializer.get().fromByteBuffer(byteBuffer); - byte[] bytes = new byte[n]; - byteBuffer.get(bytes, 0, n); - element = elementSerializer.fromByteBuffer( BytesArraySerializer.get().toByteBuffer(bytes) ); - } - } catch (BufferUnderflowException e) { - break; - } - if (element == null) { - break; - } - array.add(element); - } - return array; - } - - @Override - public ComparatorType getComparatorType() { - return elementSerializer.getComparatorType(); - } - -} Modified: gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/GoraSerializerTypeInferer.java URL: http://svn.apache.org/viewvc/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/GoraSerializerTypeInferer.java?rev=1586888&r1=1586887&r2=1586888&view=diff ============================================================================== --- gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/GoraSerializerTypeInferer.java (original) +++ gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/GoraSerializerTypeInferer.java Sat Apr 12 19:21:53 2014 @@ -19,16 +19,18 @@ package org.apache.gora.cassandra.serializers; import java.nio.ByteBuffer; +import java.util.Map; -import me.prettyprint.cassandra.serializers.BytesArraySerializer; -import me.prettyprint.cassandra.serializers.ByteBufferSerializer; import me.prettyprint.cassandra.serializers.BooleanSerializer; +import me.prettyprint.cassandra.serializers.ByteBufferSerializer; +import me.prettyprint.cassandra.serializers.BytesArraySerializer; import me.prettyprint.cassandra.serializers.DoubleSerializer; import me.prettyprint.cassandra.serializers.FloatSerializer; import me.prettyprint.cassandra.serializers.IntegerSerializer; import me.prettyprint.cassandra.serializers.LongSerializer; -import me.prettyprint.cassandra.serializers.StringSerializer; +import me.prettyprint.cassandra.serializers.ObjectSerializer; import me.prettyprint.cassandra.serializers.SerializerTypeInferer; +import me.prettyprint.cassandra.serializers.StringSerializer; import me.prettyprint.hector.api.Serializer; import org.apache.avro.Schema; @@ -36,9 +38,7 @@ import org.apache.avro.Schema.Type; import org.apache.avro.generic.GenericArray; import org.apache.avro.specific.SpecificFixed; import org.apache.avro.util.Utf8; - -import org.apache.gora.persistency.StatefulHashMap; - +import org.apache.gora.persistency.Persistent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -56,7 +56,7 @@ public class GoraSerializerTypeInferer { if (value == null) { serializer = ByteBufferSerializer.get(); } else if (value instanceof Utf8) { - serializer = Utf8Serializer.get(); + serializer = CharSequenceSerializer.get(); } else if (value instanceof Boolean) { serializer = BooleanSerializer.get(); } else if (value instanceof ByteBuffer) { @@ -80,18 +80,21 @@ public class GoraSerializerTypeInferer { if (schema.getType() == Type.ARRAY) { schema = schema.getElementType(); } - serializer = GenericArraySerializer.get(schema); - } else if (value instanceof StatefulHashMap) { - StatefulHashMap map = (StatefulHashMap)value; + serializer = ListSerializer.get(schema); + } else if (value instanceof Map) { + Map map = (Map)value; if (map.size() == 0) { serializer = ByteBufferSerializer.get(); } else { Object value0 = map.values().iterator().next(); Schema schema = TypeUtils.getSchema(value0); - serializer = StatefulHashMapSerializer.get(schema); + serializer = MapSerializer.get(schema); } - } else { + } else if (value instanceof Persistent){ + serializer = ObjectSerializer.get(); + } + else { serializer = SerializerTypeInferer.getSerializer(value); } return serializer; @@ -101,7 +104,7 @@ public class GoraSerializerTypeInferer { public static Serializer getSerializer(Class valueClass) { Serializer serializer = null; if (valueClass.equals(Utf8.class)) { - serializer = Utf8Serializer.get(); + serializer = CharSequenceSerializer.get(); } else if (valueClass.equals(Boolean.class) || valueClass.equals(boolean.class)) { serializer = BooleanSerializer.get(); } else if (valueClass.equals(ByteBuffer.class)) { @@ -126,30 +129,32 @@ public class GoraSerializerTypeInferer { public static Serializer getSerializer(Schema schema) { Serializer serializer = null; Type type = schema.getType(); - if (type == Type.STRING) { - serializer = Utf8Serializer.get(); - } else if (type == Type.BOOLEAN) { + if (type.equals(Type.STRING)) { + serializer = CharSequenceSerializer.get(); + } else if (type.equals(Type.BOOLEAN)) { serializer = BooleanSerializer.get(); - } else if (type == Type.BYTES) { + } else if (type.equals(Type.BYTES)) { serializer = ByteBufferSerializer.get(); - } else if (type == Type.DOUBLE) { + } else if (type.equals(Type.DOUBLE)) { serializer = DoubleSerializer.get(); - } else if (type == Type.FLOAT) { + } else if (type.equals(Type.FLOAT)) { serializer = FloatSerializer.get(); - } else if (type == Type.INT) { + } else if (type.equals(Type.INT)) { serializer = IntegerSerializer.get(); - } else if (type == Type.LONG) { + } else if (type.equals(Type.LONG)) { serializer = LongSerializer.get(); - } else if (type == Type.FIXED) { + } else if (type.equals(Type.FIXED)) { Class clazz = TypeUtils.getClass(schema); serializer = SpecificFixedSerializer.get(clazz); // serializer = SpecificFixedSerializer.get(schema); - } else if (type == Type.ARRAY) { - serializer = GenericArraySerializer.get(schema.getElementType()); - } else if (type == Type.MAP) { - serializer = StatefulHashMapSerializer.get(schema.getValueType()); - } else if (type == Type.UNION){ + } else if (type.equals(Type.ARRAY)) { + serializer = ListSerializer.get(schema.getElementType()); + } else if (type.equals(Type.MAP)) { + serializer = MapSerializer.get(schema.getValueType()); + } else if (type.equals(Type.UNION)){ serializer = ByteBufferSerializer.get(); + } else if (type.equals(Type.RECORD)){ + serializer = BytesArraySerializer.get(); } else { serializer = null; } @@ -160,7 +165,7 @@ public class GoraSerializerTypeInferer { public static Serializer getSerializer(Type type) { Serializer serializer = null; if (type == Type.STRING) { - serializer = Utf8Serializer.get(); + serializer = CharSequenceSerializer.get(); } else if (type == Type.BOOLEAN) { serializer = BooleanSerializer.get(); } else if (type == Type.BYTES) { @@ -197,9 +202,9 @@ public class GoraSerializerTypeInferer { } if (type == Type.ARRAY) { - serializer = GenericArraySerializer.get(elementType); + serializer = ListSerializer.get(elementType); } else if (type == Type.MAP) { - serializer = StatefulHashMapSerializer.get(elementType); + serializer = MapSerializer.get(elementType); } else { serializer = null; } Added: gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/ListSerializer.java URL: http://svn.apache.org/viewvc/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/ListSerializer.java?rev=1586888&view=auto ============================================================================== --- gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/ListSerializer.java (added) +++ gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/ListSerializer.java Sat Apr 12 19:21:53 2014 @@ -0,0 +1,193 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gora.cassandra.serializers; + +import java.nio.BufferUnderflowException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import me.prettyprint.cassandra.serializers.AbstractSerializer; +import me.prettyprint.cassandra.serializers.BytesArraySerializer; +import me.prettyprint.cassandra.serializers.IntegerSerializer; +import me.prettyprint.hector.api.Serializer; +import me.prettyprint.hector.api.ddl.ComparatorType; + +import org.apache.avro.Schema; +import org.apache.avro.Schema.Type; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A GenericArraySerializer translates the byte[] to and from GenericArray of Avro. + */ +public class ListSerializer extends AbstractSerializer> { + + public static final Logger LOG = LoggerFactory.getLogger(ListSerializer.class); + + private static Map elementTypeToSerializerMap = new HashMap(); + private static Map fixedClassToSerializerMap = new HashMap(); + + public static ListSerializer get(Type elementType) { + ListSerializer serializer = elementTypeToSerializerMap.get(elementType); + if (serializer == null) { + serializer = new ListSerializer(elementType); + elementTypeToSerializerMap.put(elementType, serializer); + } + return serializer; + } + + public static ListSerializer get(Type elementType, Class clazz) { + if (elementType != Type.FIXED) { + return null; + } + ListSerializer serializer = elementTypeToSerializerMap.get(clazz); + if (serializer == null) { + serializer = new ListSerializer(clazz); + fixedClassToSerializerMap.put(clazz, serializer); + } + return serializer; + } + + public static ListSerializer get(Schema elementSchema) { + Type type = elementSchema.getType(); + if (type == Type.FIXED) { + return get(Type.FIXED, TypeUtils.getClass(elementSchema)); + } else { + return get(type); + } + } + + private Schema elementSchema = null; + private Type elementType = null; + private int size = -1; + private Class clazz = null; + private Serializer elementSerializer = null; + + public ListSerializer(Serializer elementSerializer) { + this.elementSerializer = elementSerializer; + } + + public ListSerializer(Schema elementSchema) { + this.elementSchema = elementSchema; + elementType = elementSchema.getType(); + size = TypeUtils.getFixedSize(elementSchema); + elementSerializer = GoraSerializerTypeInferer.getSerializer(elementSchema); + } + + @SuppressWarnings("unchecked") + public ListSerializer(Type elementType) { + this.elementType = elementType; + if (elementType != Type.FIXED) { + elementSchema = Schema.create(elementType); + } + clazz = (Class) TypeUtils.getClass(elementType); + size = TypeUtils.getFixedSize(elementType); + elementSerializer = GoraSerializerTypeInferer.getSerializer(elementType); + } + + public ListSerializer(Class clazz) { + this.clazz = clazz; + elementType = TypeUtils.getType(clazz); + size = TypeUtils.getFixedSize(clazz); + if (elementType == null || elementType == Type.FIXED) { + elementType = Type.FIXED; + elementSchema = TypeUtils.getSchema(clazz); + elementSerializer = GoraSerializerTypeInferer.getSerializer(elementType, clazz); + } else { + elementSerializer = GoraSerializerTypeInferer.getSerializer(elementType); + } + } + + @Override + public ByteBuffer toByteBuffer(List array) { + if (array == null) { + return null; + } + if (size > 0) { + return toByteBufferWithFixedLengthElements(array); + } else { + return toByteBufferWithVariableLengthElements(array); + } + } + + private ByteBuffer toByteBufferWithFixedLengthElements(List array) { + ByteBuffer byteBuffer = ByteBuffer.allocate((int) array.size() * size); + for (T element : array) { + byteBuffer.put(elementSerializer.toByteBuffer(element)); + } + byteBuffer.rewind(); + return byteBuffer; + } + + private ByteBuffer toByteBufferWithVariableLengthElements(List array) { + int n = (int) array.size(); + List list = new ArrayList(n); + n *= 4; + for (T element : array) { + byte[] bytes = BytesArraySerializer.get().fromByteBuffer(elementSerializer.toByteBuffer(element)); + list.add(bytes); + n += bytes.length; + } + ByteBuffer byteBuffer = ByteBuffer.allocate(n); + for (byte[] bytes : list) { + byteBuffer.put(IntegerSerializer.get().toByteBuffer(bytes.length)); + byteBuffer.put(BytesArraySerializer.get().toByteBuffer(bytes)); + } + byteBuffer.rewind(); + return byteBuffer; + } + + @Override + public List fromByteBuffer(ByteBuffer byteBuffer) { + if (byteBuffer == null) { + return null; + } + ArrayList array = new ArrayList(); + while (true) { + T element = null; + try { + if (size > 0) { + element = elementSerializer.fromByteBuffer(byteBuffer); + } + else { + int n = IntegerSerializer.get().fromByteBuffer(byteBuffer); + byte[] bytes = new byte[n]; + byteBuffer.get(bytes, 0, n); + element = elementSerializer.fromByteBuffer( BytesArraySerializer.get().toByteBuffer(bytes) ); + } + } catch (BufferUnderflowException e) { + break; + } + if (element == null) { + break; + } + array.add(element); + } + return array; + } + + @Override + public ComparatorType getComparatorType() { + return elementSerializer.getComparatorType(); + } + +} Propchange: gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/ListSerializer.java ------------------------------------------------------------------------------ svn:mime-type = text/plain Added: gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/MapSerializer.java URL: http://svn.apache.org/viewvc/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/MapSerializer.java?rev=1586888&view=auto ============================================================================== --- gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/MapSerializer.java (added) +++ gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/MapSerializer.java Sat Apr 12 19:21:53 2014 @@ -0,0 +1,223 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gora.cassandra.serializers; + +import java.nio.BufferUnderflowException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import me.prettyprint.cassandra.serializers.AbstractSerializer; +import me.prettyprint.cassandra.serializers.BytesArraySerializer; +import me.prettyprint.cassandra.serializers.IntegerSerializer; +import me.prettyprint.hector.api.Serializer; +import me.prettyprint.hector.api.ddl.ComparatorType; + +import org.apache.avro.Schema; +import org.apache.avro.Schema.Type; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A MapSerializer translates the byte[] to and from Map of Avro. + */ +public class MapSerializer extends AbstractSerializer> { + + public static final Logger LOG = LoggerFactory.getLogger(MapSerializer.class); + + private static Map valueTypeToSerializerMap = new HashMap(); + private static Map fixedClassToSerializerMap = new HashMap(); + + public static MapSerializer get(Type valueType) { + MapSerializer serializer = valueTypeToSerializerMap.get(valueType); + if (serializer == null) { + serializer = new MapSerializer(valueType); + valueTypeToSerializerMap.put(valueType, serializer); + } + return serializer; + } + + public static MapSerializer get(Type valueType, Class clazz) { + if (valueType != Type.FIXED) { + return null; + } + MapSerializer serializer = valueTypeToSerializerMap.get(clazz); + if (serializer == null) { + serializer = new MapSerializer(clazz); + fixedClassToSerializerMap.put(clazz, serializer); + } + return serializer; + } + + public static MapSerializer get(Schema valueSchema) { + Type type = valueSchema.getType(); + if (type == Type.FIXED) { + return get(Type.FIXED, TypeUtils.getClass(valueSchema)); + } else { + return get(type); + } + } + + private Schema valueSchema = null; + private Type valueType = null; + private int size = -1; + private Class clazz = null; + private Serializer valueSerializer = null; + + public MapSerializer(Serializer valueSerializer) { + this.valueSerializer = valueSerializer; + } + + public MapSerializer(Schema valueSchema) { + this.valueSchema = valueSchema; + valueType = valueSchema.getType(); + size = TypeUtils.getFixedSize(valueSchema); + valueSerializer = GoraSerializerTypeInferer.getSerializer(valueSchema); + } + + @SuppressWarnings("unchecked") + public MapSerializer(Type valueType) { + this.valueType = valueType; + if (valueType != Type.FIXED) { + valueSchema = Schema.create(valueType); + } + clazz = (Class) TypeUtils.getClass(valueType); + size = TypeUtils.getFixedSize(valueType); + valueSerializer = GoraSerializerTypeInferer.getSerializer(valueType); + } + + public MapSerializer(Class clazz) { + this.clazz = clazz; + valueType = TypeUtils.getType(clazz); + size = TypeUtils.getFixedSize(clazz); + if (valueType == null || valueType == Type.FIXED) { + valueType = Type.FIXED; + valueSchema = TypeUtils.getSchema(clazz); + valueSerializer = GoraSerializerTypeInferer.getSerializer(valueType, clazz); + } else { + valueSerializer = GoraSerializerTypeInferer.getSerializer(valueType); + } + } + + @Override + public ByteBuffer toByteBuffer(Map map) { + if (map == null) { + return null; + } + if (size > 0) { + return toByteBufferWithFixedLengthElements(map); + } else { + return toByteBufferWithVariableLengthElements(map); + } + } + + private ByteBuffer toByteBufferWithFixedLengthElements(Map map) { + int n = (int) map.size(); + List list = new ArrayList(n); + n *= 4; + for (CharSequence key : map.keySet()) { + T value = map.get(key); + byte[] bytes = BytesArraySerializer.get().fromByteBuffer(CharSequenceSerializer.get().toByteBuffer(key)); + list.add(bytes); + n += bytes.length; + bytes = BytesArraySerializer.get().fromByteBuffer(valueSerializer.toByteBuffer(value)); + list.add(bytes); + n += bytes.length; + } + ByteBuffer byteBuffer = ByteBuffer.allocate(n); + int i = 0; + for (byte[] bytes : list) { + if (i % 2 == 0) { + byteBuffer.put(IntegerSerializer.get().toByteBuffer(bytes.length)); + } + byteBuffer.put(BytesArraySerializer.get().toByteBuffer(bytes)); + i += 1; + } + byteBuffer.rewind(); + return byteBuffer; + } + + private ByteBuffer toByteBufferWithVariableLengthElements(Map map) { + int n = (int) map.size(); + List list = new ArrayList(n); + n *= 8; + for (CharSequence key : map.keySet()) { + T value = map.get(key); + byte[] bytes = BytesArraySerializer.get().fromByteBuffer(CharSequenceSerializer.get().toByteBuffer(key)); + list.add(bytes); + n += bytes.length; + bytes = BytesArraySerializer.get().fromByteBuffer(valueSerializer.toByteBuffer(value)); + list.add(bytes); + n += bytes.length; + } + ByteBuffer byteBuffer = ByteBuffer.allocate(n); + for (byte[] bytes : list) { + byteBuffer.put(IntegerSerializer.get().toByteBuffer(bytes.length)); + byteBuffer.put(BytesArraySerializer.get().toByteBuffer(bytes)); + } + byteBuffer.rewind(); + return byteBuffer; + } + + @Override + public Map fromByteBuffer(ByteBuffer byteBuffer) { + if (byteBuffer == null) { + return null; + } + Map map = new HashMap(); + while (true) { + CharSequence key = null; + T value = null; + try { + int n = IntegerSerializer.get().fromByteBuffer(byteBuffer); + byte[] bytes = new byte[n]; + byteBuffer.get(bytes, 0, n); + key = CharSequenceSerializer.get().fromByteBuffer( BytesArraySerializer.get().toByteBuffer(bytes) ); + + if (size > 0) { + value = valueSerializer.fromByteBuffer(byteBuffer); + } + else { + n = IntegerSerializer.get().fromByteBuffer(byteBuffer); + bytes = new byte[n]; + byteBuffer.get(bytes, 0, n); + value = valueSerializer.fromByteBuffer( BytesArraySerializer.get().toByteBuffer(bytes) ); + } + } catch (BufferUnderflowException e) { + break; + } + if (key == null) { + break; + } + if (value == null) { + break; + } + map.put(key, value); + } + return map; + } + + @Override + public ComparatorType getComparatorType() { + return valueSerializer.getComparatorType(); + } + +} \ No newline at end of file Propchange: gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/MapSerializer.java ------------------------------------------------------------------------------ svn:mime-type = text/plain Modified: gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/SpecificFixedSerializer.java URL: http://svn.apache.org/viewvc/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/SpecificFixedSerializer.java?rev=1586888&r1=1586887&r2=1586888&view=diff ============================================================================== --- gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/SpecificFixedSerializer.java (original) +++ gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/SpecificFixedSerializer.java Sat Apr 12 19:21:53 2014 @@ -25,15 +25,10 @@ import java.util.Map; import me.prettyprint.cassandra.serializers.AbstractSerializer; import me.prettyprint.cassandra.serializers.BytesArraySerializer; -import me.prettyprint.hector.api.Serializer; import me.prettyprint.hector.api.ddl.ComparatorType; import static me.prettyprint.hector.api.ddl.ComparatorType.BYTESTYPE; import org.apache.avro.specific.SpecificFixed; -import org.apache.avro.Schema; -import org.apache.avro.Schema.Type; -import org.apache.avro.util.Utf8; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; Modified: gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/StatefulHashMapSerializer.java URL: http://svn.apache.org/viewvc/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/StatefulHashMapSerializer.java?rev=1586888&r1=1586887&r2=1586888&view=diff ============================================================================== --- gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/StatefulHashMapSerializer.java (original) +++ gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/StatefulHashMapSerializer.java Sat Apr 12 19:21:53 2014 @@ -1,236 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gora.cassandra.serializers; - -import java.nio.BufferUnderflowException; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import me.prettyprint.cassandra.serializers.AbstractSerializer; -import me.prettyprint.cassandra.serializers.BytesArraySerializer; -import me.prettyprint.cassandra.serializers.IntegerSerializer; -import me.prettyprint.hector.api.Serializer; -import me.prettyprint.hector.api.ddl.ComparatorType; -import static me.prettyprint.hector.api.ddl.ComparatorType.UTF8TYPE; - -import org.apache.avro.Schema; -import org.apache.avro.Schema.Type; -import org.apache.avro.specific.SpecificFixed; -import org.apache.avro.util.Utf8; -import org.apache.gora.persistency.State; -import org.apache.gora.persistency.StatefulHashMap; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * A StatefulHashMapSerializer translates the byte[] to and from StatefulHashMap of Avro. - */ -public class StatefulHashMapSerializer extends AbstractSerializer> { - - public static final Logger LOG = LoggerFactory.getLogger(StatefulHashMapSerializer.class); - - private static Map valueTypeToSerializerMap = new HashMap(); - private static Map fixedClassToSerializerMap = new HashMap(); - - public static StatefulHashMapSerializer get(Type valueType) { - StatefulHashMapSerializer serializer = valueTypeToSerializerMap.get(valueType); - if (serializer == null) { - serializer = new StatefulHashMapSerializer(valueType); - valueTypeToSerializerMap.put(valueType, serializer); - } - return serializer; - } - - public static StatefulHashMapSerializer get(Type valueType, Class clazz) { - if (valueType != Type.FIXED) { - return null; - } - StatefulHashMapSerializer serializer = valueTypeToSerializerMap.get(clazz); - if (serializer == null) { - serializer = new StatefulHashMapSerializer(clazz); - fixedClassToSerializerMap.put(clazz, serializer); - } - return serializer; - } - - public static StatefulHashMapSerializer get(Schema valueSchema) { - Type type = valueSchema.getType(); - if (type == Type.FIXED) { - return get(Type.FIXED, TypeUtils.getClass(valueSchema)); - } else { - return get(type); - } - } - - private Schema valueSchema = null; - private Type valueType = null; - private int size = -1; - private Class clazz = null; - private Serializer valueSerializer = null; - - public StatefulHashMapSerializer(Serializer valueSerializer) { - this.valueSerializer = valueSerializer; - } - - public StatefulHashMapSerializer(Schema valueSchema) { - this.valueSchema = valueSchema; - valueType = valueSchema.getType(); - size = TypeUtils.getFixedSize(valueSchema); - valueSerializer = GoraSerializerTypeInferer.getSerializer(valueSchema); - } - - public StatefulHashMapSerializer(Type valueType) { - this.valueType = valueType; - if (valueType != Type.FIXED) { - valueSchema = Schema.create(valueType); - } - clazz = TypeUtils.getClass(valueType); - size = TypeUtils.getFixedSize(valueType); - valueSerializer = GoraSerializerTypeInferer.getSerializer(valueType); - } - - public StatefulHashMapSerializer(Class clazz) { - this.clazz = clazz; - valueType = TypeUtils.getType(clazz); - size = TypeUtils.getFixedSize(clazz); - if (valueType == null || valueType == Type.FIXED) { - valueType = Type.FIXED; - valueSchema = TypeUtils.getSchema(clazz); - valueSerializer = GoraSerializerTypeInferer.getSerializer(valueType, clazz); - } else { - valueSerializer = GoraSerializerTypeInferer.getSerializer(valueType); - } - } - - @Override - public ByteBuffer toByteBuffer(StatefulHashMap map) { - if (map == null) { - return null; - } - if (size > 0) { - return toByteBufferWithFixedLengthElements(map); - } else { - return toByteBufferWithVariableLengthElements(map); - } - } - - private ByteBuffer toByteBufferWithFixedLengthElements(StatefulHashMap map) { - List list = new ArrayList(map.size()); - int n = 0; - for (Utf8 key : map.keySet()) { - if (map.getState(key) == State.DELETED) { - continue; - } - T value = map.get(key); - byte[] bytes = BytesArraySerializer.get().fromByteBuffer(Utf8Serializer.get().toByteBuffer(key)); - list.add(bytes); - n += 4; - n += bytes.length; - bytes = BytesArraySerializer.get().fromByteBuffer(valueSerializer.toByteBuffer(value)); - list.add(bytes); - n += bytes.length; - } - ByteBuffer byteBuffer = ByteBuffer.allocate(n); - int i = 0; - for (byte[] bytes : list) { - if (i % 2 == 0) { - byteBuffer.put(IntegerSerializer.get().toByteBuffer(bytes.length)); - } - byteBuffer.put(BytesArraySerializer.get().toByteBuffer(bytes)); - i += 1; - } - byteBuffer.rewind(); - return byteBuffer; - } - - private ByteBuffer toByteBufferWithVariableLengthElements(StatefulHashMap map) { - List list = new ArrayList(map.size()); - int n = 0; - for (Utf8 key : map.keySet()) { - if (map.getState(key) == State.DELETED) { - continue; - } - T value = map.get(key); - byte[] bytes = BytesArraySerializer.get().fromByteBuffer(Utf8Serializer.get().toByteBuffer(key)); - list.add(bytes); - n += 4; - n += bytes.length; - bytes = BytesArraySerializer.get().fromByteBuffer(valueSerializer.toByteBuffer(value)); - list.add(bytes); - n += 4; - n += bytes.length; - } - ByteBuffer byteBuffer = ByteBuffer.allocate(n); - for (byte[] bytes : list) { - byteBuffer.put(IntegerSerializer.get().toByteBuffer(bytes.length)); - byteBuffer.put(BytesArraySerializer.get().toByteBuffer(bytes)); - } - byteBuffer.rewind(); - return byteBuffer; - } - - @Override - public StatefulHashMap fromByteBuffer(ByteBuffer byteBuffer) { - if (byteBuffer == null) { - return null; - } - StatefulHashMap map = new StatefulHashMap(); -int i = 0; - while (true) { - Utf8 key = null; - T value = null; - try { - int n = IntegerSerializer.get().fromByteBuffer(byteBuffer); - byte[] bytes = new byte[n]; - byteBuffer.get(bytes, 0, n); - key = Utf8Serializer.get().fromByteBuffer( BytesArraySerializer.get().toByteBuffer(bytes) ); - - if (size > 0) { - value = valueSerializer.fromByteBuffer(byteBuffer); - } - else { - n = IntegerSerializer.get().fromByteBuffer(byteBuffer); - bytes = new byte[n]; - byteBuffer.get(bytes, 0, n); - value = valueSerializer.fromByteBuffer( BytesArraySerializer.get().toByteBuffer(bytes) ); - } - } catch (BufferUnderflowException e) { - break; - } - if (key == null) { - break; - } - if (value == null) { - break; - } - map.put(key, value); - } - return map; - } - - @Override - public ComparatorType getComparatorType() { - return valueSerializer.getComparatorType(); - } - -} Modified: gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/TypeUtils.java URL: http://svn.apache.org/viewvc/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/TypeUtils.java?rev=1586888&r1=1586887&r2=1586888&view=diff ============================================================================== --- gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/TypeUtils.java (original) +++ gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/TypeUtils.java Sat Apr 12 19:21:53 2014 @@ -19,16 +19,15 @@ package org.apache.gora.cassandra.serializers; import java.nio.ByteBuffer; +import java.util.List; +import java.util.Map; import org.apache.avro.Schema; import org.apache.avro.Schema.Type; import org.apache.avro.generic.GenericArray; import org.apache.avro.specific.SpecificFixed; import org.apache.avro.util.Utf8; -import org.apache.gora.persistency.ListGenericArray; import org.apache.gora.persistency.Persistent; -import org.apache.gora.persistency.StatefulHashMap; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,13 +40,13 @@ public class TypeUtils { public static final Logger LOG = LoggerFactory.getLogger(TypeUtils.class); // @SuppressWarnings({ "rawtypes", "unchecked" }) - public static Class getClass(Object value) { + public static Class getClass(Object value) { return value.getClass(); } public static Schema getSchema(Object value) { if (value instanceof GenericArray) { - return Schema.createArray( getElementSchema((GenericArray)value) ); + return Schema.createArray( getElementSchema((GenericArray)value) ); } else { return getSchema( getClass(value) ); } @@ -72,9 +71,9 @@ public class TypeUtils { return Type.INT; } else if (clazz.equals(Long.class) || clazz.equals(long.class)) { return Type.LONG; - } else if (clazz.equals(ListGenericArray.class)) { + } else if (clazz.isAssignableFrom(List.class)) { return Type.ARRAY; - } else if (clazz.equals(StatefulHashMap.class)) { + } else if (clazz.isAssignableFrom(Map.class)) { return Type.MAP; } else if (clazz.equals(Persistent.class)) { return Type.RECORD; @@ -85,7 +84,7 @@ public class TypeUtils { } } - public static Class getClass(Type type) { + public static Class getClass(Type type) { if (type == Type.STRING) { return Utf8.class; } else if (type == Type.BOOLEAN) { @@ -101,9 +100,9 @@ public class TypeUtils { } else if (type == Type.LONG) { return Long.class; } else if (type == Type.ARRAY) { - return ListGenericArray.class; + return List.class; } else if (type == Type.MAP) { - return StatefulHashMap.class; + return Map.class; } else if (type == Type.RECORD) { return Persistent.class; } else if (type == Type.FIXED) { @@ -114,7 +113,7 @@ public class TypeUtils { } } - public static Schema getSchema(Class clazz) { + public static Schema getSchema(Class clazz) { Type type = getType(clazz); if (type == null) { return null; @@ -157,7 +156,7 @@ public class TypeUtils { } } - public static Class getClass(Schema schema) { + public static Class getClass(Schema schema) { Type type = schema.getType(); if (type == null) { return null; @@ -198,7 +197,7 @@ public class TypeUtils { } } - public static int getFixedSize(Class clazz) { + public static int getFixedSize(Class clazz) { Type type = getType(clazz); if (type == Type.FIXED) { try { @@ -215,15 +214,11 @@ public class TypeUtils { } } - public static Schema getElementSchema(GenericArray array) { + public static Schema getElementSchema(GenericArray array) { Schema schema = array.getSchema(); return (schema.getType() == Type.ARRAY) ? schema.getElementType() : schema; } - public static Type getElementType(ListGenericArray array) { - return getElementSchema(array).getType(); - } - /* public static Schema getValueSchema(StatefulHashMap map) { return map.getSchema().getValueType(); Modified: gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/Utf8Serializer.java URL: http://svn.apache.org/viewvc/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/Utf8Serializer.java?rev=1586888&r1=1586887&r2=1586888&view=diff ============================================================================== --- gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/Utf8Serializer.java (original) +++ gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/serializers/Utf8Serializer.java Sat Apr 12 19:21:53 2014 @@ -1,62 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gora.cassandra.serializers; - -import java.nio.ByteBuffer; - -import me.prettyprint.cassandra.serializers.AbstractSerializer; -import me.prettyprint.cassandra.serializers.StringSerializer; -import me.prettyprint.hector.api.ddl.ComparatorType; -import static me.prettyprint.hector.api.ddl.ComparatorType.UTF8TYPE; - -import org.apache.avro.util.Utf8; - -/** - * A Utf8Serializer translates the byte[] to and from Utf8 object of Avro. - */ -public final class Utf8Serializer extends AbstractSerializer { - - private static final Utf8Serializer instance = new Utf8Serializer(); - - public static Utf8Serializer get() { - return instance; - } - - @Override - public ByteBuffer toByteBuffer(Utf8 obj) { - if (obj == null) { - return null; - } - return StringSerializer.get().toByteBuffer(obj.toString()); - } - - @Override - public Utf8 fromByteBuffer(ByteBuffer byteBuffer) { - if (byteBuffer == null) { - return null; - } - return new Utf8(StringSerializer.get().fromByteBuffer(byteBuffer)); - } - - @Override - public ComparatorType getComparatorType() { - return UTF8TYPE; - } - -} Modified: gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java URL: http://svn.apache.org/viewvc/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java?rev=1586888&r1=1586887&r2=1586888&view=diff ============================================================================== --- gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java (original) +++ gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraClient.java Sat Apr 12 19:21:53 2014 @@ -18,16 +18,11 @@ package org.apache.gora.cassandra.store; -import static org.apache.gora.cassandra.store.CassandraStore.colFamConsLvl; -import static org.apache.gora.cassandra.store.CassandraStore.readOpConsLvl; -import static org.apache.gora.cassandra.store.CassandraStore.writeOpConsLvl; - import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Properties; import me.prettyprint.cassandra.model.ConfigurableConsistencyLevel; import me.prettyprint.cassandra.serializers.ByteBufferSerializer; @@ -52,62 +47,27 @@ import me.prettyprint.hector.api.HConsis import me.prettyprint.hector.api.Serializer; import org.apache.avro.generic.GenericArray; -import org.apache.avro.util.Utf8; import org.apache.gora.cassandra.query.CassandraQuery; import org.apache.gora.cassandra.serializers.GoraSerializerTypeInferer; import org.apache.gora.mapreduce.GoraRecordReader; import org.apache.gora.persistency.impl.PersistentBase; -import org.apache.gora.persistency.State; -import org.apache.gora.persistency.StatefulHashMap; import org.apache.gora.query.Query; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -/** - * CassandraClient is where all of the primary datastore functionality is - * executed. Typically CassandraClient is invoked by calling - * {@link org.apache.gora.cassandra.store.CassandraStore#initialize(Class, Class, Properties)}. - * CassandraClient deals with Cassandra data model definition, mutation, - * and general/specific mappings. - * @see {@link org.apache.gora.cassandra.store.CassandraStore#initialize(Class, Class, Properties)} - * - * @param - * @param - */ public class CassandraClient { - - /** The logging implementation */ public static final Logger LOG = LoggerFactory.getLogger(CassandraClient.class); - + private Cluster cluster; private Keyspace keyspace; private Mutator mutator; private Class keyClass; private Class persistentClass; - - /** Object containing the XML mapping for Cassandra. */ + private CassandraMapping cassandraMapping = null; - /** Hector client default column family consistency level. */ - public static final String DEFAULT_HECTOR_CONSIS_LEVEL = "QUORUM"; - - /** Cassandra serializer to be used for serializing Gora's keys. */ private Serializer keySerializer; - - /** - * Given our key, persistentClass from - * {@link org.apache.gora.cassandra.store.CassandraStore#initialize(Class, Class, Properties)} - * we make best efforts to dictate our data model. - * We make a quick check within {@link org.apache.gora.cassandra.store.CassandraClient#checkKeyspace(String) - * to see if our keyspace has already been invented, this simple check prevents us from - * recreating the keyspace if it already exists. - * We then simple specify (based on the input keyclass) an appropriate serializer - * via {@link org.apache.gora.cassandra.serializers.GoraSerializerTypeInferer} before - * defining a mutator from and by which we can mutate this object. - * @param keyClass the Key by which we wish o assign a record object - * @param persistentClass the generated {@link org.apache.org.gora.persistency.Peristent} bean representing the data. - * @throws Exception - */ + public void initialize(Class keyClass, Class persistentClass) throws Exception { this.keyClass = keyClass; @@ -115,16 +75,14 @@ public class CassandraClientgora.properites, - * then column family consistency level is set to QUORUM (by default) which permits - * consistency to wait for a quorum of replicas to respond regardless of data center. - * QUORUM is Hector Client's default setting and we respect that here as well. - * - * @see http://hector-client.github.io/hector/build/html/content/consistency_level.html + * In this method, we also utilise Hector's {@ConfigurableConsistencyLevel} + * logic. It is set by passing a ConfigurableConsistencyLevel object right + * when the Keyspace is created. Currently consistency level is .ONE which + * permits consistency to wait until one replica has responded. */ public void checkKeyspace() { // "describe keyspace ;" query @@ -164,25 +114,24 @@ public class CassandraClient clmap = getConsisLevelForColFams(columnFamilyDefinitions); - // Column family consistency levels - ccl.setReadCfConsistencyLevels(clmap); - ccl.setWriteCfConsistencyLevels(clmap); - // Operations consistency levels - String opConsisLvl = (readOpConsLvl!=null || !readOpConsLvl.isEmpty())?readOpConsLvl:DEFAULT_HECTOR_CONSIS_LEVEL; - ccl.setDefaultReadConsistencyLevel(HConsistencyLevel.valueOf(opConsisLvl)); - LOG.debug("Hector read consistency configured to '" + opConsisLvl + "'."); - opConsisLvl = (writeOpConsLvl!=null || !writeOpConsLvl.isEmpty())?writeOpConsLvl:DEFAULT_HECTOR_CONSIS_LEVEL; - ccl.setDefaultWriteConsistencyLevel(HConsistencyLevel.valueOf(opConsisLvl)); - LOG.debug("Hector write consistency configured to '" + opConsisLvl + "'."); + // LOG.info("Keyspace '" + this.cassandraMapping.getKeyspaceName() + "' in cluster '" + this.cassandraMapping.getClusterName() + "' was created on host '" + this.cassandraMapping.getHostName() + "'"); + + // Create a customized Consistency Level + ConfigurableConsistencyLevel configurableConsistencyLevel = new ConfigurableConsistencyLevel(); + Map clmap = new HashMap(); + + // Define CL.ONE for ColumnFamily "ColumnFamily" + clmap.put("ColumnFamily", HConsistencyLevel.ONE); + + // In this we use CL.ONE for read and writes. But you can use different CLs if needed. + configurableConsistencyLevel.setReadCfConsistencyLevels(clmap); + configurableConsistencyLevel.setWriteCfConsistencyLevels(clmap); // Then let the keyspace know - HFactory.createKeyspace("Keyspace", this.cluster, ccl); + HFactory.createKeyspace("Keyspace", this.cluster, configurableConsistencyLevel); + keyspaceDefinition = null; } else { @@ -196,10 +145,11 @@ public class CassandraClient with the mapping between colFams and consistency level. - */ - private Map getConsisLevelForColFams(List pColFams) { - Map clMap = new HashMap(); - // Get columnFamily consistency level. - String colFamConsisLvl = (colFamConsLvl != null && !colFamConsLvl.isEmpty())?colFamConsLvl:DEFAULT_HECTOR_CONSIS_LEVEL; - LOG.debug("ColumnFamily consistency level configured to '" + colFamConsisLvl + "'."); - // Define consistency for ColumnFamily "ColumnFamily" - for (ColumnFamilyDefinition colFamDef : pColFams) - clMap.put(colFamDef.getName(), HConsistencyLevel.valueOf(colFamConsisLvl)); - return clMap; - } - - /** * Drop keyspace. */ public void dropKeyspace() { - // "drop keyspace ;" query this.cluster.dropKeyspace(this.cassandraMapping.getKeyspaceName()); } @@ -245,21 +178,40 @@ public class CassandraClient getReverseMap(Query query) { Map map = new HashMap(); for (String field: query.getFields()) { String family = this.getMappingFamily(field); String column = this.getMappingColumn(field); - + map.put(family + ":" + column, field); } + return map; } + /** + * Determines if a column is a superColumn or not. + * @param family + * @return boolean + */ public boolean isSuper(String family) { return this.cassandraMapping.isSuper(family); } @@ -542,19 +508,20 @@ public class CassandraClient rangeSuperSlicesQuery = - HFactory.createRangeSuperSlicesQuery(this.keyspace, this.keySerializer, StringSerializer.get(), - ByteBufferSerializer.get(), ByteBufferSerializer.get()); + + RangeSuperSlicesQuery rangeSuperSlicesQuery = HFactory.createRangeSuperSlicesQuery(this.keyspace, this.keySerializer, StringSerializer.get(), ByteBufferSerializer.get(), ByteBufferSerializer.get()); rangeSuperSlicesQuery.setColumnFamily(family); rangeSuperSlicesQuery.setKeys(startKey, endKey); rangeSuperSlicesQuery.setRange("", "", false, GoraRecordReader.BUFFER_LIMIT_READ_VALUE); rangeSuperSlicesQuery.setRowCount(limit); rangeSuperSlicesQuery.setColumnNames(columnNames); - + + QueryResult> queryResult = rangeSuperSlicesQuery.execute(); OrderedSuperRows orderedRows = queryResult.get(); return orderedRows.getList(); + + } /** @@ -562,6 +529,6 @@ public class CassandraClient getFamilyMap(){ return this.familyMap; } - /** - * Gets all attributes related to a column. - * @return - */ public Map getColumnsAttribs(){ return this.columnAttrMap; } Modified: gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraMappingManager.java URL: http://svn.apache.org/viewvc/gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraMappingManager.java?rev=1586888&r1=1586887&r2=1586888&view=diff ============================================================================== --- gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraMappingManager.java (original) +++ gora/trunk/gora-cassandra/src/main/java/org/apache/gora/cassandra/store/CassandraMappingManager.java Sat Apr 12 19:21:53 2014 @@ -49,7 +49,7 @@ public class CassandraMappingManager { return manager; } - /** + /** * Objects to maintain mapped keyspaces */ private Map keyspaceMap = null; @@ -78,7 +78,7 @@ public class CassandraMappingManager { } String keyspaceName = mappingElement.getAttributeValue(KEYSPACE_ELEMENT); if (LOG.isDebugEnabled()) { - LOG.debug("className=" + className + " -> keyspaceName=" + keyspaceName); + LOG.debug("persistentClassName=" + className + " -> keyspaceName=" + keyspaceName); } Element keyspaceElement = keyspaceMap.get(keyspaceName); if (keyspaceElement == null) {