Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 17161200CE1 for ; Fri, 28 Jul 2017 08:02:07 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 1620016C2E2; Fri, 28 Jul 2017 06:02:07 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 1000C16C2CF for ; Fri, 28 Jul 2017 08:02:05 +0200 (CEST) Received: (qmail 74300 invoked by uid 500); 28 Jul 2017 06:02:05 -0000 Mailing-List: contact dev-help@gora.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@gora.apache.org Delivered-To: mailing list dev@gora.apache.org Received: (qmail 74289 invoked by uid 99); 28 Jul 2017 06:02:05 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 28 Jul 2017 06:02:05 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id CA699C02BE for ; Fri, 28 Jul 2017 06:02:04 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -100.002 X-Spam-Level: X-Spam-Status: No, score=-100.002 tagged_above=-999 required=6.31 tests=[RP_MATCHES_RCVD=-0.001, SPF_PASS=-0.001, USER_IN_WHITELIST=-100] autolearn=disabled Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id 1InAh2nczxi0 for ; Fri, 28 Jul 2017 06:02:02 +0000 (UTC) Received: from mailrelay1-us-west.apache.org (mailrelay1-us-west.apache.org [209.188.14.139]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTP id C71CE60D9B for ; Fri, 28 Jul 2017 06:02:01 +0000 (UTC) Received: from jira-lw-us.apache.org (unknown [207.244.88.139]) by mailrelay1-us-west.apache.org (ASF Mail Server at mailrelay1-us-west.apache.org) with ESMTP id 19922E0D57 for ; Fri, 28 Jul 2017 06:02:01 +0000 (UTC) Received: from jira-lw-us.apache.org (localhost [127.0.0.1]) by jira-lw-us.apache.org (ASF Mail Server at jira-lw-us.apache.org) with ESMTP id 6ECC124D2D for ; Fri, 28 Jul 2017 06:02:00 +0000 (UTC) Date: Fri, 28 Jul 2017 06:02:00 +0000 (UTC) From: "ASF GitHub Bot (JIRA)" To: dev@gora.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Commented] (GORA-497) Migrate CassandraThrift to CQL MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 archived-at: Fri, 28 Jul 2017 06:02:07 -0000 [ https://issues.apache.org/jira/browse/GORA-497?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16104496#comment-16104496 ] ASF GitHub Bot commented on GORA-497: ------------------------------------- Github user djkevincr commented on a diff in the pull request: https://github.com/apache/gora/pull/110#discussion_r130017958 --- Diff: gora-cassandra-cql/src/main/java/org/apache/gora/cassandra/serializers/AvroCassandraUtils.java --- @@ -0,0 +1,292 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gora.cassandra.serializers; + +import org.apache.avro.Schema; +import org.apache.avro.specific.SpecificData; +import org.apache.avro.util.Utf8; +import org.apache.gora.cassandra.bean.CassandraKey; +import org.apache.gora.cassandra.bean.Field; +import org.apache.gora.cassandra.store.CassandraMapping; +import org.apache.gora.persistency.Persistent; +import org.apache.gora.persistency.impl.DirtyListWrapper; +import org.apache.gora.persistency.impl.DirtyMapWrapper; +import org.apache.gora.persistency.impl.PersistentBase; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * This class is Utils class for Avro serialization. + */ +class AvroCassandraUtils { + + /** + * Default schema index with value "0" used when AVRO Union data types are stored. + */ + private static final int DEFAULT_UNION_SCHEMA = 0; + + private static final Logger LOG = LoggerFactory.getLogger(AvroCassandraUtils.class); + + static void processKeys(CassandraMapping cassandraMapping, Object key, List keys, List values) { + CassandraKey cassandraKey = cassandraMapping.getCassandraKey(); + if (cassandraMapping.isPartitionKeyDefined()) { + if (cassandraKey != null) { + if (key instanceof PersistentBase) { + PersistentBase keyBase = (PersistentBase) key; + for (Schema.Field field : keyBase.getSchema().getFields()) { + if (cassandraMapping.getFieldFromFieldName(field.name()) != null) { + keys.add(field.name()); + Object value = keyBase.get(field.pos()); + value = getFieldValueFromAvroBean(field.schema(), field.schema().getType(), value); + values.add(value); + } else { + LOG.debug("Ignoring field {}, Since field couldn't find in the {} mapping", new Object[]{field.name(), cassandraMapping.getPersistentClass()}); + } + } + } else { + LOG.error("Key bean isn't extended by {} .", new Object[]{cassandraMapping.getKeyClass(), PersistentBase.class}); + } + } else { + for (Field field : cassandraMapping.getInlinedDefinedPartitionKeys()) { + keys.add(field.getFieldName()); + values.add(key); + } + } + } else { + keys.add(cassandraMapping.getDefaultCassandraKey().getFieldName()); + values.add(key.toString()); + } + } + + /** + * For every field within an object, we pass in a field schema, Type and value. + * This enables us to process fields (based on their characteristics) + * preparing them for persistence. + * + * @param fieldSchema the associated field schema + * @param type the field type + * @param fieldValue the field value. + * @return field value + */ + static Object getFieldValueFromAvroBean(Schema fieldSchema, Schema.Type type, Object fieldValue) { + switch (type) { + case RECORD: + PersistentBase persistent = (PersistentBase) fieldValue; + PersistentBase newRecord = (PersistentBase) SpecificData.get().newRecord(persistent, persistent.getSchema()); + for (Schema.Field member : fieldSchema.getFields()) { + if (member.pos() == 0 || !persistent.isDirty()) { + continue; + } + Schema memberSchema = member.schema(); + Schema.Type memberType = memberSchema.getType(); + Object memberValue = persistent.get(member.pos()); + newRecord.put(member.pos(), getFieldValueFromAvroBean(memberSchema, memberType, memberValue)); + } + fieldValue = newRecord; + break; + case MAP: + Schema valueSchema = fieldSchema.getValueType(); + Schema.Type valuetype = valueSchema.getType(); + HashMap map = new HashMap<>(); + for (Map.Entry e : ((Map) fieldValue).entrySet()) { + String mapKey = e.getKey().toString(); + Object mapValue = e.getValue(); + mapValue = getFieldValueFromAvroBean(valueSchema, valuetype, mapValue); + map.put(mapKey, mapValue); + } + fieldValue = map; + break; + case ARRAY: + valueSchema = fieldSchema.getElementType(); + valuetype = valueSchema.getType(); + ArrayList list = new ArrayList<>(); + for (Object item : (Collection) fieldValue) { + Object value = getFieldValueFromAvroBean(valueSchema, valuetype, item); + list.add(value); + } + fieldValue = list; + break; + case UNION: + // storing the union selected schema, the actual value will + // be stored as soon as we get break out. + if (fieldValue != null) { + int schemaPos = getUnionSchema(fieldValue, fieldSchema); + Schema unionSchema = fieldSchema.getTypes().get(schemaPos); + Schema.Type unionType = unionSchema.getType(); + fieldValue = getFieldValueFromAvroBean(unionSchema, unionType, fieldValue); + } + break; + case STRING: + fieldValue = fieldValue.toString(); + break; + default: + break; + } + return fieldValue; + } + + /** + * Given an object and the object schema this function obtains, + * from within the UNION schema, the position of the type used. + * If no data type can be inferred then we return a default value + * of position 0. + * + * @param pValue Object + * @param pUnionSchema avro Schema + * @return the unionSchemaPosition. + */ + private static int getUnionSchema(Object pValue, Schema pUnionSchema) { + int unionSchemaPos = 0; +// String valueType = pValue.getClass().getSimpleName(); + for (Schema currentSchema : pUnionSchema.getTypes()) { + Schema.Type schemaType = currentSchema.getType(); + if (pValue instanceof CharSequence && schemaType.equals(Schema.Type.STRING)) + return unionSchemaPos; + else if (pValue instanceof ByteBuffer && schemaType.equals(Schema.Type.BYTES)) + return unionSchemaPos; + else if (pValue instanceof Integer && schemaType.equals(Schema.Type.INT)) + return unionSchemaPos; + else if (pValue instanceof Long && schemaType.equals(Schema.Type.LONG)) + return unionSchemaPos; + else if (pValue instanceof Double && schemaType.equals(Schema.Type.DOUBLE)) + return unionSchemaPos; + else if (pValue instanceof Float && schemaType.equals(Schema.Type.FLOAT)) + return unionSchemaPos; + else if (pValue instanceof Boolean && schemaType.equals(Schema.Type.BOOLEAN)) + return unionSchemaPos; + else if (pValue instanceof Map && schemaType.equals(Schema.Type.MAP)) + return unionSchemaPos; + else if (pValue instanceof List && schemaType.equals(Schema.Type.ARRAY)) + return unionSchemaPos; + else if (pValue instanceof Persistent && schemaType.equals(Schema.Type.RECORD)) + return unionSchemaPos; + unionSchemaPos++; + } + // if we weren't able to determine which data type it is, then we return the default + return DEFAULT_UNION_SCHEMA; + } + + static String encodeFieldKey(final String key) { + if (key == null) { + return null; + } + return key.replace(".", "\u00B7") + .replace(":", "\u00FF") + .replace(";", "\u00FE") + .replace(" ", "\u00FD") + .replace("%", "\u00FC") + .replace("=", "\u00FB"); + } + + static String decodeFieldKey(final String key) { --- End diff -- This method doesn't have any usages. > Migrate CassandraThrift to CQL > ------------------------------- > > Key: GORA-497 > URL: https://issues.apache.org/jira/browse/GORA-497 > Project: Apache Gora > Issue Type: Improvement > Components: gora-cassandra > Reporter: Madhawa Gunasekara > Assignee: Madhawa Gunasekara > Labels: gsoc2017 > Fix For: 0.8 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)