Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id ECFFE200BD0 for ; Wed, 30 Nov 2016 08:53:07 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id EBB28160B21; Wed, 30 Nov 2016 07:53:07 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id AA414160B08 for ; Wed, 30 Nov 2016 08:53:05 +0100 (CET) Received: (qmail 83866 invoked by uid 500); 30 Nov 2016 07:53:04 -0000 Mailing-List: contact commits-help@carbondata.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@carbondata.incubator.apache.org Delivered-To: mailing list commits@carbondata.incubator.apache.org Received: (qmail 83853 invoked by uid 99); 30 Nov 2016 07:53:04 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 30 Nov 2016 07:53:04 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id 83ED6C6EF2 for ; Wed, 30 Nov 2016 07:53:03 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -6.218 X-Spam-Level: X-Spam-Status: No, score=-6.218 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-2.999, URIBL_BLOCKED=0.001] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id 6pdjilJxuQ2y for ; Wed, 30 Nov 2016 07:52:57 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id 926D65F5FB for ; Wed, 30 Nov 2016 07:52:55 +0000 (UTC) Received: (qmail 82391 invoked by uid 99); 30 Nov 2016 07:51:39 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 30 Nov 2016 07:51:39 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id A1715F1815; Wed, 30 Nov 2016 07:51:39 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: ravipesala@apache.org To: commits@carbondata.incubator.apache.org Date: Wed, 30 Nov 2016 07:51:44 -0000 Message-Id: <8e0e9387d8f74ca4b26223aaa3099be4@git.apache.org> In-Reply-To: <9be1300b76484f41a3ab4aedaf2172cb@git.apache.org> References: <9be1300b76484f41a3ab4aedaf2172cb@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [06/14] incubator-carbondata git commit: rebase archived-at: Wed, 30 Nov 2016 07:53:08 -0000 http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/java/org/apache/carbondata/spark/partition/api/Partition.java ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/java/org/apache/carbondata/spark/partition/api/Partition.java b/integration/spark/src/main/java/org/apache/carbondata/spark/partition/api/Partition.java deleted file mode 100644 index 61639d3..0000000 --- a/integration/spark/src/main/java/org/apache/carbondata/spark/partition/api/Partition.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.carbondata.spark.partition.api; - -import java.io.Serializable; -import java.util.List; - -public interface Partition extends Serializable { - /** - * unique identification for the partition in the cluster. - */ - String getUniqueID(); - - /** - * File path for the raw data represented by this partition - */ - String getFilePath(); - - /** - * result - * - * @return - */ - List getFilesPath(); -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/java/org/apache/carbondata/spark/partition/api/impl/DataPartitionerProperties.java ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/java/org/apache/carbondata/spark/partition/api/impl/DataPartitionerProperties.java b/integration/spark/src/main/java/org/apache/carbondata/spark/partition/api/impl/DataPartitionerProperties.java deleted file mode 100644 index bc6e54f..0000000 --- a/integration/spark/src/main/java/org/apache/carbondata/spark/partition/api/impl/DataPartitionerProperties.java +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.carbondata.spark.partition.api.impl; - -import java.io.File; -import java.io.FileInputStream; -import java.io.IOException; -import java.util.Properties; - -import org.apache.carbondata.common.logging.LogService; -import org.apache.carbondata.common.logging.LogServiceFactory; - -public final class DataPartitionerProperties { - private static final LogService LOGGER = - LogServiceFactory.getLogService(DataPartitionerProperties.class.getName()); - - private static DataPartitionerProperties instance; - - private Properties properties; - - private DataPartitionerProperties() { - properties = loadProperties(); - } - - public static DataPartitionerProperties getInstance() { - if (instance == null) { - instance = new DataPartitionerProperties(); - } - return instance; - } - - public String getValue(String key, String defaultVal) { - return properties.getProperty(key, defaultVal); - } - - public String getValue(String key) { - return properties.getProperty(key); - } - - /** - * Read the properties from CSVFilePartitioner.properties - */ - private Properties loadProperties() { - Properties props = new Properties(); - - File file = new File("DataPartitioner.properties"); - FileInputStream fis = null; - try { - if (file.exists()) { - fis = new FileInputStream(file); - - props.load(fis); - } - } catch (Exception e) { - LOGGER - .error(e, e.getMessage()); - } finally { - if (null != fis) { - try { - fis.close(); - } catch (IOException e) { - LOGGER.error(e, - e.getMessage()); - } - } - } - - return props; - } -} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/java/org/apache/carbondata/spark/partition/api/impl/DefaultLoadBalancer.java ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/java/org/apache/carbondata/spark/partition/api/impl/DefaultLoadBalancer.java b/integration/spark/src/main/java/org/apache/carbondata/spark/partition/api/impl/DefaultLoadBalancer.java deleted file mode 100644 index 9bee8a2..0000000 --- a/integration/spark/src/main/java/org/apache/carbondata/spark/partition/api/impl/DefaultLoadBalancer.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.carbondata.spark.partition.api.impl; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import org.apache.carbondata.core.constants.CarbonCommonConstants; -import org.apache.carbondata.spark.partition.api.Partition; - -/** - * A sample load balancer to distribute the partitions to the available nodes in a round robin mode. - */ -public class DefaultLoadBalancer { - private Map> nodeToPartitonMap = - new HashMap>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); - - private Map partitonToNodeMap = - new HashMap(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); - - public DefaultLoadBalancer(List nodes, List partitions) { - //Per form a round robin allocation - int nodeCount = nodes.size(); - - int partitioner = 0; - for (Partition partition : partitions) { - int nodeindex = partitioner % nodeCount; - String node = nodes.get(nodeindex); - - List oldList = nodeToPartitonMap.get(node); - if (oldList == null) { - oldList = new ArrayList(CarbonCommonConstants.CONSTANT_SIZE_TEN); - nodeToPartitonMap.put(node, oldList); - } - oldList.add(partition); - - partitonToNodeMap.put(partition, node); - - partitioner++; - } - } - - public List getPartitionsForNode(String node) { - return nodeToPartitonMap.get(node); - } - - public String getNodeForPartitions(Partition partition) { - return partitonToNodeMap.get(partition); - } -} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/java/org/apache/carbondata/spark/partition/api/impl/PartitionImpl.java ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/java/org/apache/carbondata/spark/partition/api/impl/PartitionImpl.java b/integration/spark/src/main/java/org/apache/carbondata/spark/partition/api/impl/PartitionImpl.java deleted file mode 100644 index bd7cc42..0000000 --- a/integration/spark/src/main/java/org/apache/carbondata/spark/partition/api/impl/PartitionImpl.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.carbondata.spark.partition.api.impl; - -import java.util.List; - -import org.apache.carbondata.spark.partition.api.Partition; - -public class PartitionImpl implements Partition { - private static final long serialVersionUID = 3020172346383028547L; - private String uniqueID; - private String folderPath; - - - public PartitionImpl(String uniqueID, String folderPath) { - this.uniqueID = uniqueID; - this.folderPath = folderPath; - } - - @Override public String getUniqueID() { - return uniqueID; - } - - @Override public String getFilePath() { - return folderPath; - } - - @Override public String toString() { - return "{PartitionID -> " + uniqueID + " Path: " + folderPath + '}'; - } - - @Override public List getFilesPath() { - // TODO Auto-generated method stub - return null; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/java/org/apache/carbondata/spark/partition/api/impl/PartitionMultiFileImpl.java ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/java/org/apache/carbondata/spark/partition/api/impl/PartitionMultiFileImpl.java b/integration/spark/src/main/java/org/apache/carbondata/spark/partition/api/impl/PartitionMultiFileImpl.java deleted file mode 100644 index de32b5c..0000000 --- a/integration/spark/src/main/java/org/apache/carbondata/spark/partition/api/impl/PartitionMultiFileImpl.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.carbondata.spark.partition.api.impl; - -import java.util.List; - -import org.apache.carbondata.spark.partition.api.Partition; - -public class PartitionMultiFileImpl implements Partition { - private static final long serialVersionUID = -4363447826181193976L; - private String uniqueID; - private List folderPath; - - public PartitionMultiFileImpl(String uniqueID, List folderPath) { - this.uniqueID = uniqueID; - this.folderPath = folderPath; - } - - @Override public String getUniqueID() { - // TODO Auto-generated method stub - return uniqueID; - } - - @Override public String getFilePath() { - // TODO Auto-generated method stub - return null; - } - - @Override public List getFilesPath() { - // TODO Auto-generated method stub - return folderPath; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/java/org/apache/carbondata/spark/partition/api/impl/QueryPartitionHelper.java ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/java/org/apache/carbondata/spark/partition/api/impl/QueryPartitionHelper.java b/integration/spark/src/main/java/org/apache/carbondata/spark/partition/api/impl/QueryPartitionHelper.java deleted file mode 100644 index e05be7d..0000000 --- a/integration/spark/src/main/java/org/apache/carbondata/spark/partition/api/impl/QueryPartitionHelper.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.carbondata.spark.partition.api.impl; - -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import org.apache.carbondata.core.constants.CarbonCommonConstants; -import org.apache.carbondata.scan.model.CarbonQueryPlan; -import org.apache.carbondata.spark.partition.api.DataPartitioner; -import org.apache.carbondata.spark.partition.api.Partition; - - -public final class QueryPartitionHelper { - private static QueryPartitionHelper instance = new QueryPartitionHelper(); - private Map partitionerMap = - new HashMap(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); - private Map loadBalancerMap = - new HashMap(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); - - private QueryPartitionHelper() { - - } - - public static QueryPartitionHelper getInstance() { - return instance; - } - - /** - * Get partitions applicable for query based on filters applied in query - */ - public List getPartitionsForQuery(CarbonQueryPlan queryPlan) { - String tableUniqueName = queryPlan.getDatabaseName() + '_' + queryPlan.getTableName(); - - DataPartitioner dataPartitioner = partitionerMap.get(tableUniqueName); - - List queryPartitions = dataPartitioner.getPartitions(queryPlan); - return queryPartitions; - } - - public List getAllPartitions(String databaseName, String tableName) { - String tableUniqueName = databaseName + '_' + tableName; - - DataPartitioner dataPartitioner = partitionerMap.get(tableUniqueName); - - return dataPartitioner.getAllPartitions(); - } - - /** - * Get the node name where the partition is assigned to. - */ - public String getLocation(Partition partition, String databaseName, String tableName) { - String tableUniqueName = databaseName + '_' + tableName; - - DefaultLoadBalancer loadBalancer = loadBalancerMap.get(tableUniqueName); - return loadBalancer.getNodeForPartitions(partition); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/java/org/apache/carbondata/spark/partition/api/impl/SampleDataPartitionerImpl.java ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/java/org/apache/carbondata/spark/partition/api/impl/SampleDataPartitionerImpl.java b/integration/spark/src/main/java/org/apache/carbondata/spark/partition/api/impl/SampleDataPartitionerImpl.java deleted file mode 100644 index c9b434a..0000000 --- a/integration/spark/src/main/java/org/apache/carbondata/spark/partition/api/impl/SampleDataPartitionerImpl.java +++ /dev/null @@ -1,151 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.carbondata.spark.partition.api.impl; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; - -import org.apache.carbondata.common.logging.LogService; -import org.apache.carbondata.common.logging.LogServiceFactory; -import org.apache.carbondata.core.constants.CarbonCommonConstants; -import org.apache.carbondata.scan.model.CarbonQueryPlan; -import org.apache.carbondata.spark.partition.api.DataPartitioner; -import org.apache.carbondata.spark.partition.api.Partition; - -import org.apache.spark.sql.execution.command.Partitioner; - -/** - * Sample partition. - */ -public class SampleDataPartitionerImpl implements DataPartitioner { - private static final LogService LOGGER = - LogServiceFactory.getLogService(SampleDataPartitionerImpl.class.getName()); - private int numberOfPartitions = 1; - - private int partionColumnIndex = -1; - - private String partitionColumn; - - private Partitioner partitioner; - private List allPartitions; - private String baseLocation; - - public SampleDataPartitionerImpl() { - } - - public void initialize(String basePath, String[] columns, Partitioner partitioner) { - this.partitioner = partitioner; - numberOfPartitions = partitioner.partitionCount(); - - partitionColumn = partitioner.partitionColumn()[0]; - LOGGER.info("SampleDataPartitionerImpl initializing with following properties."); - LOGGER.info("partitionCount: " + numberOfPartitions); - LOGGER.info("partitionColumn: " + partitionColumn); - LOGGER.info("basePath: " + basePath); - LOGGER.info("columns: " + Arrays.toString(columns)); - - this.baseLocation = basePath; - allPartitions = new ArrayList(CarbonCommonConstants.CONSTANT_SIZE_TEN); - - for (int i = 0; i < columns.length; i++) { - if (columns[i].equalsIgnoreCase(partitionColumn)) { - partionColumnIndex = i; - break; - } - } - - for (int partionCounter = 0; partionCounter < numberOfPartitions; partionCounter++) { - PartitionImpl partitionImpl = - new PartitionImpl("" + partionCounter, baseLocation + '/' + partionCounter); - - List includedHashes = new ArrayList(CarbonCommonConstants.CONSTANT_SIZE_TEN); - includedHashes.add(partionCounter); - - allPartitions.add(partitionImpl); - } - } - - @Override public Partition getPartionForTuple(Object[] tuple, long rowCounter) { - int hashCode; - if (partionColumnIndex == -1) { - hashCode = hashCode(rowCounter); - } else { - try { - hashCode = hashCode(((String) tuple[partionColumnIndex]).hashCode()); - } catch (NumberFormatException e) { - hashCode = hashCode(0); - } - } - return allPartitions.get(hashCode); - } - - /** - * - */ - public List getAllPartitions() { - return allPartitions; - } - - /** - * @see DataPartitioner#getPartitions(CarbonQueryPlan) - */ - public List getPartitions(CarbonQueryPlan queryPlan) { - // TODO: this has to be redone during partitioning implmentatation - return allPartitions; - } - - /** - * Identify the partitions applicable for the given filter - */ - public List getPartitions() { - return allPartitions; - - // TODO: this has to be redone during partitioning implementation - // for (Partition aPartition : allPartitions) { - // CarbonDimensionLevelFilter partitionFilterDetails = - // aPartition.getPartitionDetails().get(partitionColumn); - // - // //Check if the partition is serving any of the - // //hash code generated for include filter of query - // for (Object includeFilter : msisdnFilter.getIncludeFilter()) { - // int hashCode = hashCode(((String) includeFilter).hashCode()); - // if (partitionFilterDetails.getIncludeFilter().contains(hashCode)) { - // allowedPartitions.add(aPartition); - // break; - // } - // } - // } - - } - - private int hashCode(long key) { - return (int) (Math.abs(key) % numberOfPartitions); - } - - @Override public String[] getPartitionedColumns() { - return new String[] { partitionColumn }; - } - - @Override public Partitioner getPartitioner() { - return partitioner; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java b/integration/spark/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java deleted file mode 100644 index bb8fc5c..0000000 --- a/integration/spark/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.carbondata.spark.readsupport; - -import java.sql.Timestamp; - -import org.apache.carbondata.core.carbon.AbsoluteTableIdentifier; -import org.apache.carbondata.core.carbon.metadata.datatype.DataType; -import org.apache.carbondata.core.carbon.metadata.encoder.Encoding; -import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonColumn; -import org.apache.carbondata.core.util.DataTypeUtil; -import org.apache.carbondata.hadoop.readsupport.impl.AbstractDictionaryDecodedReadSupport; - -import org.apache.spark.sql.Row; -import org.apache.spark.sql.catalyst.expressions.GenericRow; -import org.apache.spark.unsafe.types.UTF8String; - -public class SparkRowReadSupportImpl extends AbstractDictionaryDecodedReadSupport { - - @Override public void initialize(CarbonColumn[] carbonColumns, - AbsoluteTableIdentifier absoluteTableIdentifier) { - super.initialize(carbonColumns, absoluteTableIdentifier); - //can initialize and generate schema here. - } - - @Override public Row readRow(Object[] data) { - for (int i = 0; i < dictionaries.length; i++) { - if (dictionaries[i] != null) { - data[i] = DataTypeUtil - .getDataBasedOnDataType(dictionaries[i].getDictionaryValueForKey((int) data[i]), - dataTypes[i]); - switch (dataTypes[i]) { - case STRING: - data[i] = UTF8String.fromString(data[i].toString()); - break; - case TIMESTAMP: - data[i] = new Timestamp((long) data[i] / 1000); - break; - case LONG: - data[i] = data[i]; - break; - default: - } - } else if (carbonColumns[i].hasEncoding(Encoding.DIRECT_DICTIONARY)) { - //convert the long to timestamp in case of direct dictionary column - if (DataType.TIMESTAMP == carbonColumns[i].getDataType()) { - data[i] = new Timestamp((long) data[i] / 1000); - } - } - } - return new GenericRow(data); - } -} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/java/org/apache/carbondata/spark/splits/TableSplit.java ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/java/org/apache/carbondata/spark/splits/TableSplit.java b/integration/spark/src/main/java/org/apache/carbondata/spark/splits/TableSplit.java deleted file mode 100644 index 3fb24e2..0000000 --- a/integration/spark/src/main/java/org/apache/carbondata/spark/splits/TableSplit.java +++ /dev/null @@ -1,129 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/** - * - */ -package org.apache.carbondata.spark.splits; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; -import java.io.Serializable; -import java.nio.charset.Charset; -import java.util.ArrayList; -import java.util.List; - -import org.apache.carbondata.common.logging.LogService; -import org.apache.carbondata.common.logging.LogServiceFactory; -import org.apache.carbondata.core.constants.CarbonCommonConstants; -import org.apache.carbondata.spark.partition.api.Partition; - -import org.apache.hadoop.io.Writable; - - -/** - * It represents one region server as one split. - */ -public class TableSplit implements Serializable, Writable { - private static final long serialVersionUID = -8058151330863145575L; - - private static final LogService LOGGER = - LogServiceFactory.getLogService(TableSplit.class.getName()); - private List locations = new ArrayList(CarbonCommonConstants.CONSTANT_SIZE_TEN); - - private Partition partition; - - /** - * @return the locations - */ - public List getLocations() { - return locations; - } - - /** - * @param locations the locations to set - */ - public void setLocations(List locations) { - this.locations = locations; - } - - /** - * @return Returns the partitions. - */ - public Partition getPartition() { - return partition; - } - - /** - * @param partition The partitions to set. - */ - public void setPartition(Partition partition) { - this.partition = partition; - } - - @Override public void readFields(DataInput in) throws IOException { - - int sizeLoc = in.readInt(); - for (int i = 0; i < sizeLoc; i++) { - byte[] b = new byte[in.readInt()]; - in.readFully(b); - locations.add(new String(b, Charset.defaultCharset())); - } - - byte[] buf = new byte[in.readInt()]; - in.readFully(buf); - ByteArrayInputStream bis = new ByteArrayInputStream(buf); - ObjectInputStream ois = new ObjectInputStream(bis); - try { - partition = (Partition) ois.readObject(); - } catch (ClassNotFoundException e) { - LOGGER.error(e, e.getMessage()); - } - ois.close(); - } - - @Override public void write(DataOutput out) throws IOException { - - int sizeLoc = locations.size(); - out.writeInt(sizeLoc); - for (int i = 0; i < sizeLoc; i++) { - byte[] bytes = locations.get(i).getBytes(Charset.defaultCharset()); - out.writeInt(bytes.length); - out.write(bytes); - } - - ByteArrayOutputStream bos = new ByteArrayOutputStream(); - - ObjectOutputStream obs = new ObjectOutputStream(bos); - obs.writeObject(partition); - obs.close(); - byte[] byteArray = bos.toByteArray(); - out.writeInt(byteArray.length); - out.write(byteArray); - } - - public String toString() { - return partition.getUniqueID() + ' ' + locations; - } -} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/java/org/apache/carbondata/spark/util/CarbonQueryUtil.java ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/java/org/apache/carbondata/spark/util/CarbonQueryUtil.java b/integration/spark/src/main/java/org/apache/carbondata/spark/util/CarbonQueryUtil.java deleted file mode 100644 index d2e716f..0000000 --- a/integration/spark/src/main/java/org/apache/carbondata/spark/util/CarbonQueryUtil.java +++ /dev/null @@ -1,142 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.carbondata.spark.util; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import org.apache.carbondata.core.constants.CarbonCommonConstants; -import org.apache.carbondata.core.load.LoadMetadataDetails; -import org.apache.carbondata.scan.model.CarbonQueryPlan; -import org.apache.carbondata.spark.partition.api.Partition; -import org.apache.carbondata.spark.partition.api.impl.DefaultLoadBalancer; -import org.apache.carbondata.spark.partition.api.impl.PartitionMultiFileImpl; -import org.apache.carbondata.spark.partition.api.impl.QueryPartitionHelper; -import org.apache.carbondata.spark.splits.TableSplit; - -import org.apache.commons.lang3.StringUtils; - -/** - * This utilty parses the Carbon query plan to actual query model object. - */ -public final class CarbonQueryUtil { - - private CarbonQueryUtil() { - - } - - /** - * It creates the one split for each region server. - */ - public static synchronized TableSplit[] getTableSplits(String databaseName, String tableName, - CarbonQueryPlan queryPlan) throws IOException { - - //Just create splits depends on locations of region servers - List allPartitions = null; - if (queryPlan == null) { - allPartitions = - QueryPartitionHelper.getInstance().getAllPartitions(databaseName, tableName); - } else { - allPartitions = - QueryPartitionHelper.getInstance().getPartitionsForQuery(queryPlan); - } - TableSplit[] splits = new TableSplit[allPartitions.size()]; - for (int i = 0; i < splits.length; i++) { - splits[i] = new TableSplit(); - List locations = new ArrayList(CarbonCommonConstants.CONSTANT_SIZE_TEN); - Partition partition = allPartitions.get(i); - String location = QueryPartitionHelper.getInstance() - .getLocation(partition, databaseName, tableName); - locations.add(location); - splits[i].setPartition(partition); - splits[i].setLocations(locations); - } - - return splits; - } - - /** - * It creates the one split for each region server. - */ - public static TableSplit[] getTableSplitsForDirectLoad(String sourcePath) throws Exception { - - //Just create splits depends on locations of region servers - DefaultLoadBalancer loadBalancer = null; - List allPartitions = getAllFilesForDataLoad(sourcePath); - loadBalancer = new DefaultLoadBalancer(new ArrayList(), allPartitions); - TableSplit[] tblSplits = new TableSplit[allPartitions.size()]; - for (int i = 0; i < tblSplits.length; i++) { - tblSplits[i] = new TableSplit(); - List locations = new ArrayList(CarbonCommonConstants.CONSTANT_SIZE_TEN); - Partition partition = allPartitions.get(i); - String location = loadBalancer.getNodeForPartitions(partition); - locations.add(location); - tblSplits[i].setPartition(partition); - tblSplits[i].setLocations(locations); - } - return tblSplits; - } - - /** - * split sourcePath by comma - */ - public static void splitFilePath(String sourcePath, List partitionsFiles, - String separator) { - if (StringUtils.isNotEmpty(sourcePath)) { - String[] files = sourcePath.split(separator); - for (String file : files) { - partitionsFiles.add(file); - } - } - } - - private static List getAllFilesForDataLoad(String sourcePath) throws Exception { - List files = new ArrayList(CarbonCommonConstants.CONSTANT_SIZE_TEN); - splitFilePath(sourcePath, files, CarbonCommonConstants.COMMA); - List partitionList = - new ArrayList(CarbonCommonConstants.CONSTANT_SIZE_TEN); - Map> partitionFiles = new HashMap>(); - - partitionFiles.put(0, new ArrayList(CarbonCommonConstants.CONSTANT_SIZE_TEN)); - partitionList.add(new PartitionMultiFileImpl(0 + "", partitionFiles.get(0))); - - for (int i = 0; i < files.size(); i++) { - partitionFiles.get(i % 1).add(files.get(i)); - } - return partitionList; - } - - public static List getListOfSlices(LoadMetadataDetails[] details) { - List slices = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); - if (null != details) { - for (LoadMetadataDetails oneLoad : details) { - if (!CarbonCommonConstants.STORE_LOADSTATUS_FAILURE.equals(oneLoad.getLoadStatus())) { - String loadName = CarbonCommonConstants.LOAD_FOLDER + oneLoad.getLoadName(); - slices.add(loadName); - } - } - } - return slices; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/java/org/apache/carbondata/spark/util/LoadMetadataUtil.java ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/java/org/apache/carbondata/spark/util/LoadMetadataUtil.java b/integration/spark/src/main/java/org/apache/carbondata/spark/util/LoadMetadataUtil.java deleted file mode 100644 index 11cf9f8..0000000 --- a/integration/spark/src/main/java/org/apache/carbondata/spark/util/LoadMetadataUtil.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/** - * Project Name : Carbon - * Module Name : CARBON Data Processor - * Author : R00903928 - * Created Date : 15-Sep-2015 - * FileName : LoadMetadataUtil.java - * Description : Kettle step to generate MD Key - * Class Version : 1.0 - */ -package org.apache.carbondata.spark.util; - -import org.apache.carbondata.core.carbon.metadata.schema.table.CarbonTable; -import org.apache.carbondata.core.constants.CarbonCommonConstants; -import org.apache.carbondata.core.load.LoadMetadataDetails; -import org.apache.carbondata.lcm.status.SegmentStatusManager; -import org.apache.carbondata.processing.model.CarbonLoadModel; - -public final class LoadMetadataUtil { - private LoadMetadataUtil() { - - } - - public static boolean isLoadDeletionRequired(CarbonLoadModel loadModel) { - CarbonTable table = org.apache.carbondata.core.carbon.metadata.CarbonMetadata.getInstance() - .getCarbonTable(loadModel.getDatabaseName() + '_' + loadModel.getTableName()); - - String metaDataLocation = table.getMetaDataFilepath(); - LoadMetadataDetails[] details = SegmentStatusManager.readLoadMetadata(metaDataLocation); - if (details != null && details.length != 0) { - for (LoadMetadataDetails oneRow : details) { - if ((CarbonCommonConstants.MARKED_FOR_DELETE.equalsIgnoreCase(oneRow.getLoadStatus()) - || CarbonCommonConstants.SEGMENT_COMPACTED.equalsIgnoreCase(oneRow.getLoadStatus())) - && oneRow.getVisibility().equalsIgnoreCase("true")) { - return true; - } - } - } - - return false; - - } -} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonColumnValidator.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonColumnValidator.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonColumnValidator.scala deleted file mode 100644 index ea97bca..0000000 --- a/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonColumnValidator.scala +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.carbondata.spark - -import org.apache.carbondata.core.carbon.metadata.schema.table.column.ColumnSchema -import org.apache.carbondata.spark.exception.MalformedCarbonCommandException - - /** - * Carbon column validator - */ -class CarbonColumnValidator extends ColumnValidator { - def validateColumns(allColumns: Seq[ColumnSchema]) { - allColumns.foreach { columnSchema => - val colWithSameId = allColumns.filter { x => - x.getColumnUniqueId.equals(columnSchema.getColumnUniqueId) - } - if (colWithSameId.size > 1) { - throw new MalformedCarbonCommandException("Two column can not have same columnId") - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonDataFrameWriter.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonDataFrameWriter.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonDataFrameWriter.scala index a1a2ecb..c464538 100644 --- a/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonDataFrameWriter.scala +++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonDataFrameWriter.scala @@ -63,7 +63,7 @@ class CarbonDataFrameWriter(val dataFrame: DataFrame) { */ private def loadTempCSV(options: CarbonOption, cc: CarbonContext): Unit = { // temporary solution: write to csv file, then load the csv into carbon - val storePath = CarbonEnv.getInstance(cc).carbonCatalog.storePath + val storePath = CarbonEnv.get.carbonMetastore.storePath val tempCSVFolder = new StringBuilder(storePath).append(CarbonCommonConstants.FILE_SEPARATOR) .append("tempCSV") .append(CarbonCommonConstants.UNDERSCORE).append(options.dbName) http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonFilters.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonFilters.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonFilters.scala deleted file mode 100644 index 3162f80..0000000 --- a/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonFilters.scala +++ /dev/null @@ -1,391 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.spark - -import scala.collection.mutable.ArrayBuffer - -import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.optimizer.{AttributeReferenceWrapper, CarbonAliasDecoderRelation} -import org.apache.spark.sql.sources -import org.apache.spark.sql.types.StructType - -import org.apache.carbondata.core.carbon.metadata.datatype.DataType -import org.apache.carbondata.core.carbon.metadata.schema.table.CarbonTable -import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonColumn -import org.apache.carbondata.scan.expression.{ColumnExpression => CarbonColumnExpression, Expression => CarbonExpression, LiteralExpression => CarbonLiteralExpression} -import org.apache.carbondata.scan.expression.conditional._ -import org.apache.carbondata.scan.expression.logical.{AndExpression, FalseExpression, OrExpression} -import org.apache.carbondata.spark.util.CarbonScalaUtil - -/** - * All filter conversions are done here. - */ -object CarbonFilters { - - /** - * Converts data sources filters to carbon filter predicates. - */ - def createCarbonFilter(schema: StructType, - predicate: sources.Filter): Option[CarbonExpression] = { - val dataTypeOf = schema.map(f => f.name -> f.dataType).toMap - - def createFilter(predicate: sources.Filter): Option[CarbonExpression] = { - predicate match { - - case sources.EqualTo(name, value) => - Some(new EqualToExpression(getCarbonExpression(name), - getCarbonLiteralExpression(name, value))) - case sources.Not(sources.EqualTo(name, value)) => - Some(new NotEqualsExpression(getCarbonExpression(name), - getCarbonLiteralExpression(name, value))) - - case sources.EqualNullSafe(name, value) => - Some(new EqualToExpression(getCarbonExpression(name), - getCarbonLiteralExpression(name, value))) - case sources.Not(sources.EqualNullSafe(name, value)) => - Some(new NotEqualsExpression(getCarbonExpression(name), - getCarbonLiteralExpression(name, value))) - - case sources.GreaterThan(name, value) => - Some(new GreaterThanExpression(getCarbonExpression(name), - getCarbonLiteralExpression(name, value))) - case sources.LessThan(name, value) => - Some(new LessThanExpression(getCarbonExpression(name), - getCarbonLiteralExpression(name, value))) - case sources.GreaterThanOrEqual(name, value) => - Some(new GreaterThanEqualToExpression(getCarbonExpression(name), - getCarbonLiteralExpression(name, value))) - case sources.LessThanOrEqual(name, value) => - Some(new LessThanEqualToExpression(getCarbonExpression(name), - getCarbonLiteralExpression(name, value))) - - case sources.In(name, values) => - Some(new InExpression(getCarbonExpression(name), - new ListExpression( - convertToJavaList(values.map(f => getCarbonLiteralExpression(name, f)).toList)))) - case sources.Not(sources.In(name, values)) => - Some(new NotInExpression(getCarbonExpression(name), - new ListExpression( - convertToJavaList(values.map(f => getCarbonLiteralExpression(name, f)).toList)))) - - case sources.And(lhs, rhs) => - (createFilter(lhs) ++ createFilter(rhs)).reduceOption(new AndExpression(_, _)) - - case sources.Or(lhs, rhs) => - for { - lhsFilter <- createFilter(lhs) - rhsFilter <- createFilter(rhs) - } yield { - new OrExpression(lhsFilter, rhsFilter) - } - - case _ => None - } - } - - def getCarbonExpression(name: String) = { - new CarbonColumnExpression(name, - CarbonScalaUtil.convertSparkToCarbonDataType(dataTypeOf(name))) - } - - def getCarbonLiteralExpression(name: String, value: Any): CarbonExpression = { - new CarbonLiteralExpression(value, - CarbonScalaUtil.convertSparkToCarbonDataType(dataTypeOf(name))) - } - - createFilter(predicate) - } - - - // Check out which filters can be pushed down to carbon, remaining can be handled in spark layer. - // Mostly dimension filters are only pushed down since it is faster in carbon. - def selectFilters(filters: Seq[Expression], - attrList: java.util.HashSet[AttributeReferenceWrapper], - aliasMap: CarbonAliasDecoderRelation): Unit = { - def translate(expr: Expression, or: Boolean = false): Option[sources.Filter] = { - expr match { - case or@ Or(left, right) => - - val leftFilter = translate(left, or = true) - val rightFilter = translate(right, or = true) - if (leftFilter.isDefined && rightFilter.isDefined) { - Some( sources.Or(leftFilter.get, rightFilter.get)) - } else { - or.collect { - case attr: AttributeReference => - attrList.add(AttributeReferenceWrapper(aliasMap.getOrElse(attr, attr))) - } - None - } - - case And(left, right) => - (translate(left) ++ translate(right)).reduceOption(sources.And) - - case EqualTo(a: Attribute, Literal(v, t)) => - Some(sources.EqualTo(a.name, v)) - case EqualTo(l@Literal(v, t), a: Attribute) => - Some(sources.EqualTo(a.name, v)) - case EqualTo(Cast(a: Attribute, _), Literal(v, t)) => - Some(sources.EqualTo(a.name, v)) - case EqualTo(Literal(v, t), Cast(a: Attribute, _)) => - Some(sources.EqualTo(a.name, v)) - - case Not(EqualTo(a: Attribute, Literal(v, t))) => new - Some(sources.Not(sources.EqualTo(a.name, v))) - case Not(EqualTo(Literal(v, t), a: Attribute)) => new - Some(sources.Not(sources.EqualTo(a.name, v))) - case Not(EqualTo(Cast(a: Attribute, _), Literal(v, t))) => new - Some(sources.Not(sources.EqualTo(a.name, v))) - case Not(EqualTo(Literal(v, t), Cast(a: Attribute, _))) => new - Some(sources.Not(sources.EqualTo(a.name, v))) - case IsNotNull(a: Attribute) => Some(sources.IsNotNull(a.name)) - case IsNull(a: Attribute) => Some(sources.IsNull(a.name)) - case Not(In(a: Attribute, list)) if !list.exists(!_.isInstanceOf[Literal]) => - val hSet = list.map(e => e.eval(EmptyRow)) - Some(sources.Not(sources.In(a.name, hSet.toArray))) - case In(a: Attribute, list) if !list.exists(!_.isInstanceOf[Literal]) => - val hSet = list.map(e => e.eval(EmptyRow)) - Some(sources.In(a.name, hSet.toArray)) - case Not(In(Cast(a: Attribute, _), list)) - if !list.exists(!_.isInstanceOf[Literal]) => - val hSet = list.map(e => e.eval(EmptyRow)) - Some(sources.Not(sources.In(a.name, hSet.toArray))) - case In(Cast(a: Attribute, _), list) if !list.exists(!_.isInstanceOf[Literal]) => - val hSet = list.map(e => e.eval(EmptyRow)) - Some(sources.In(a.name, hSet.toArray)) - - case GreaterThan(a: Attribute, Literal(v, t)) => - Some(sources.GreaterThan(a.name, v)) - case GreaterThan(Literal(v, t), a: Attribute) => - Some(sources.LessThan(a.name, v)) - case GreaterThan(Cast(a: Attribute, _), Literal(v, t)) => - Some(sources.GreaterThan(a.name, v)) - case GreaterThan(Literal(v, t), Cast(a: Attribute, _)) => - Some(sources.LessThan(a.name, v)) - - case LessThan(a: Attribute, Literal(v, t)) => - Some(sources.LessThan(a.name, v)) - case LessThan(Literal(v, t), a: Attribute) => - Some(sources.GreaterThan(a.name, v)) - case LessThan(Cast(a: Attribute, _), Literal(v, t)) => - Some(sources.LessThan(a.name, v)) - case LessThan(Literal(v, t), Cast(a: Attribute, _)) => - Some(sources.GreaterThan(a.name, v)) - - case GreaterThanOrEqual(a: Attribute, Literal(v, t)) => - Some(sources.GreaterThanOrEqual(a.name, v)) - case GreaterThanOrEqual(Literal(v, t), a: Attribute) => - Some(sources.LessThanOrEqual(a.name, v)) - case GreaterThanOrEqual(Cast(a: Attribute, _), Literal(v, t)) => - Some(sources.GreaterThanOrEqual(a.name, v)) - case GreaterThanOrEqual(Literal(v, t), Cast(a: Attribute, _)) => - Some(sources.LessThanOrEqual(a.name, v)) - - case LessThanOrEqual(a: Attribute, Literal(v, t)) => - Some(sources.LessThanOrEqual(a.name, v)) - case LessThanOrEqual(Literal(v, t), a: Attribute) => - Some(sources.GreaterThanOrEqual(a.name, v)) - case LessThanOrEqual(Cast(a: Attribute, _), Literal(v, t)) => - Some(sources.LessThanOrEqual(a.name, v)) - case LessThanOrEqual(Literal(v, t), Cast(a: Attribute, _)) => - Some(sources.GreaterThanOrEqual(a.name, v)) - - case others => - if (!or) { - others.collect { - case attr: AttributeReference => - attrList.add(AttributeReferenceWrapper(aliasMap.getOrElse(attr, attr))) - } - } - None - } - } - filters.flatMap(translate(_, false)).toArray - } - - def processExpression(exprs: Seq[Expression], - attributesNeedToDecode: java.util.HashSet[AttributeReference], - unprocessedExprs: ArrayBuffer[Expression], - carbonTable: CarbonTable): Option[CarbonExpression] = { - def transformExpression(expr: Expression, or: Boolean = false): Option[CarbonExpression] = { - expr match { - case or@ Or(left, right) => - val leftFilter = transformExpression(left, true) - val rightFilter = transformExpression(right, true) - if (leftFilter.isDefined && rightFilter.isDefined) { - Some(new OrExpression(leftFilter.get, rightFilter.get)) - } else { - or.collect { - case attr: AttributeReference => attributesNeedToDecode.add(attr) - } - unprocessedExprs += or - None - } - - case And(left, right) => - (transformExpression(left) ++ transformExpression(right)).reduceOption(new - AndExpression(_, _)) - - case EqualTo(a: Attribute, l@Literal(v, t)) => new - Some(new EqualToExpression(transformExpression(a).get, transformExpression(l).get)) - case EqualTo(l@Literal(v, t), a: Attribute) => new - Some(new EqualToExpression(transformExpression(a).get, transformExpression(l).get)) - case EqualTo(Cast(a: Attribute, _), l@Literal(v, t)) => new - Some(new EqualToExpression(transformExpression(a).get, transformExpression(l).get)) - case EqualTo(l@Literal(v, t), Cast(a: Attribute, _)) => new - Some(new EqualToExpression(transformExpression(a).get, transformExpression(l).get)) - - case Not(EqualTo(a: Attribute, l@Literal(v, t))) => new - Some(new NotEqualsExpression(transformExpression(a).get, transformExpression(l).get)) - case Not(EqualTo(l@Literal(v, t), a: Attribute)) => new - Some(new NotEqualsExpression(transformExpression(a).get, transformExpression(l).get)) - case Not(EqualTo(Cast(a: Attribute, _), l@Literal(v, t))) => new - Some(new NotEqualsExpression(transformExpression(a).get, transformExpression(l).get)) - case Not(EqualTo(l@Literal(v, t), Cast(a: Attribute, _))) => new - Some(new NotEqualsExpression(transformExpression(a).get, transformExpression(l).get)) - case IsNotNull(child: Attribute) => - Some(new NotEqualsExpression(transformExpression(child).get, - transformExpression(Literal(null)).get, true)) - case IsNull(child: Attribute) => - Some(new EqualToExpression(transformExpression(child).get, - transformExpression(Literal(null)).get, true)) - case Not(In(a: Attribute, list)) - if !list.exists(!_.isInstanceOf[Literal]) => - if (list.exists(x => isNullLiteral(x.asInstanceOf[Literal]))) { - Some(new FalseExpression(transformExpression(a).get)) - } else { - Some(new NotInExpression(transformExpression(a).get, - new ListExpression(convertToJavaList(list.map(transformExpression(_).get))))) - } - case In(a: Attribute, list) if !list.exists(!_.isInstanceOf[Literal]) => - Some(new InExpression(transformExpression(a).get, - new ListExpression(convertToJavaList(list.map(transformExpression(_).get))))) - case Not(In(Cast(a: Attribute, _), list)) - if !list.exists(!_.isInstanceOf[Literal]) => - /* if any illogical expression comes in NOT IN Filter like - NOT IN('scala',NULL) this will be treated as false expression and will - always return no result. */ - if (list.exists(x => isNullLiteral(x.asInstanceOf[Literal]))) { - Some(new FalseExpression(transformExpression(a).get)) - } else { - Some(new NotInExpression(transformExpression(a).get, new ListExpression( - convertToJavaList(list.map(transformExpression(_).get))))) - } - case In(Cast(a: Attribute, _), list) if !list.exists(!_.isInstanceOf[Literal]) => - Some(new InExpression(transformExpression(a).get, - new ListExpression(convertToJavaList(list.map(transformExpression(_).get))))) - - case GreaterThan(a: Attribute, l@Literal(v, t)) => - Some(new GreaterThanExpression(transformExpression(a).get, transformExpression(l).get)) - case GreaterThan(Cast(a: Attribute, _), l@Literal(v, t)) => - Some(new GreaterThanExpression(transformExpression(a).get, transformExpression(l).get)) - case GreaterThan(l@Literal(v, t), a: Attribute) => - Some(new LessThanExpression(transformExpression(a).get, transformExpression(l).get)) - case GreaterThan(l@Literal(v, t), Cast(a: Attribute, _)) => - Some(new LessThanExpression(transformExpression(a).get, transformExpression(l).get)) - - case LessThan(a: Attribute, l@Literal(v, t)) => - Some(new LessThanExpression(transformExpression(a).get, transformExpression(l).get)) - case LessThan(Cast(a: Attribute, _), l@Literal(v, t)) => - Some(new LessThanExpression(transformExpression(a).get, transformExpression(l).get)) - case LessThan(l@Literal(v, t), a: Attribute) => - Some(new GreaterThanExpression(transformExpression(a).get, transformExpression(l).get)) - case LessThan(l@Literal(v, t), Cast(a: Attribute, _)) => - Some(new GreaterThanExpression(transformExpression(a).get, transformExpression(l).get)) - - case GreaterThanOrEqual(a: Attribute, l@Literal(v, t)) => - Some(new GreaterThanEqualToExpression(transformExpression(a).get, - transformExpression(l).get)) - case GreaterThanOrEqual(Cast(a: Attribute, _), l@Literal(v, t)) => - Some(new GreaterThanEqualToExpression(transformExpression(a).get, - transformExpression(l).get)) - case GreaterThanOrEqual(l@Literal(v, t), a: Attribute) => - Some(new LessThanEqualToExpression(transformExpression(a).get, - transformExpression(l).get)) - case GreaterThanOrEqual(l@Literal(v, t), Cast(a: Attribute, _)) => - Some(new LessThanEqualToExpression(transformExpression(a).get, - transformExpression(l).get)) - - case LessThanOrEqual(a: Attribute, l@Literal(v, t)) => - Some(new LessThanEqualToExpression(transformExpression(a).get, - transformExpression(l).get)) - case LessThanOrEqual(Cast(a: Attribute, _), l@Literal(v, t)) => - Some(new LessThanEqualToExpression(transformExpression(a).get, - transformExpression(l).get)) - case LessThanOrEqual(l@Literal(v, t), a: Attribute) => - Some(new GreaterThanEqualToExpression(transformExpression(a).get, - transformExpression(l).get)) - case LessThanOrEqual(l@Literal(v, t), Cast(a: Attribute, _)) => - Some(new GreaterThanEqualToExpression(transformExpression(a).get, - transformExpression(l).get)) - - case AttributeReference(name, dataType, _, _) => - Some(new CarbonColumnExpression(name, - CarbonScalaUtil.convertSparkToCarbonDataType( - getActualCarbonDataType(name, carbonTable)))) - case Literal(name, dataType) => Some(new - CarbonLiteralExpression(name, CarbonScalaUtil.convertSparkToCarbonDataType(dataType))) - case Cast(left, right) if !left.isInstanceOf[Literal] => transformExpression(left) - case others => - if (!or) { - others.collect { - case attr: AttributeReference => attributesNeedToDecode.add(attr) - } - unprocessedExprs += others - } - None - } - } - exprs.flatMap(transformExpression(_, false)).reduceOption(new AndExpression(_, _)) - } - private def isNullLiteral(exp: Expression): Boolean = { - if (null != exp - && exp.isInstanceOf[Literal] - && (exp.asInstanceOf[Literal].dataType == org.apache.spark.sql.types.DataTypes.NullType) - || (exp.asInstanceOf[Literal].value == null)) { - true - } else { - false - } - } - private def getActualCarbonDataType(column: String, carbonTable: CarbonTable) = { - var carbonColumn: CarbonColumn = - carbonTable.getDimensionByName(carbonTable.getFactTableName, column) - val dataType = if (carbonColumn != null) { - carbonColumn.getDataType - } else { - carbonColumn = carbonTable.getMeasureByName(carbonTable.getFactTableName, column) - carbonColumn.getDataType match { - case DataType.INT => DataType.LONG - case DataType.LONG => DataType.LONG - case DataType.DECIMAL => DataType.DECIMAL - case _ => DataType.DOUBLE - } - } - CarbonScalaUtil.convertCarbonToSparkDataType(dataType) - } - - // Convert scala list to java list, Cannot use scalaList.asJava as while deserializing it is - // not able find the classes inside scala list and gives ClassNotFoundException. - private def convertToJavaList( - scalaList: Seq[CarbonExpression]): java.util.List[CarbonExpression] = { - val javaList = new java.util.ArrayList[CarbonExpression]() - scalaList.foreach(javaList.add) - javaList - } -} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala deleted file mode 100644 index e8bc97e..0000000 --- a/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.spark - -import org.apache.carbondata.core.constants.CarbonCommonConstants - -/** - * Contains all options for Spark data source - */ -class CarbonOption(options: Map[String, String]) { - def tableIdentifier: String = options.getOrElse("tableName", s"$dbName.$tableName") - - def dbName: String = options.getOrElse("dbName", CarbonCommonConstants.DATABASE_DEFAULT_NAME) - - def tableName: String = options.getOrElse("tableName", "default_table") - - def tableId: String = options.getOrElse("tableId", "default_table_id") - - def partitionCount: String = options.getOrElse("partitionCount", "1") - - def partitionClass: String = { - options.getOrElse("partitionClass", - "org.apache.carbondata.spark.partition.api.impl.SampleDataPartitionerImpl") - } - - def tempCSV: Boolean = options.getOrElse("tempCSV", "true").toBoolean - - def compress: Boolean = options.getOrElse("compress", "false").toBoolean - - def useKettle: Boolean = options.getOrElse("useKettle", "true").toBoolean -} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonSparkFactory.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonSparkFactory.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonSparkFactory.scala deleted file mode 100644 index 7618558..0000000 --- a/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonSparkFactory.scala +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.carbondata.spark - -import org.apache.carbondata.core.carbon.{CarbonTableIdentifier, ColumnIdentifier} -import org.apache.carbondata.core.carbon.metadata.schema.table.column.{CarbonDimension, ColumnSchema} - - - /** - * Column validator - */ -trait ColumnValidator { - def validateColumns(columns: Seq[ColumnSchema]) -} -/** - * Dictionary related helper service - */ -trait DictionaryDetailService { - def getDictionaryDetail(dictFolderPath: String, primDimensions: Array[CarbonDimension], - table: CarbonTableIdentifier, storePath: String): DictionaryDetail -} - -/** - * Dictionary related detail - */ -case class DictionaryDetail(columnIdentifiers: Array[ColumnIdentifier], - dictFilePaths: Array[String], dictFileExists: Array[Boolean]) - -/** - * Factory class - */ -object CarbonSparkFactory { - /** - * @return column validator - */ - def getCarbonColumnValidator(): ColumnValidator = { - new CarbonColumnValidator - } - - /** - * @return dictionary helper - */ - def getDictionaryDetailService(): DictionaryDetailService = { - new DictionaryDetailHelper - } -} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/scala/org/apache/carbondata/spark/DictionaryDetailHelper.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/DictionaryDetailHelper.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/DictionaryDetailHelper.scala deleted file mode 100644 index 52457b8..0000000 --- a/integration/spark/src/main/scala/org/apache/carbondata/spark/DictionaryDetailHelper.scala +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.carbondata.spark - -import scala.collection.mutable.HashMap - -import org.apache.carbondata.core.carbon.{CarbonTableIdentifier, ColumnIdentifier} -import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonDimension -import org.apache.carbondata.core.carbon.path.{CarbonStorePath, CarbonTablePath} -import org.apache.carbondata.core.datastorage.store.filesystem.{CarbonFile, CarbonFileFilter} -import org.apache.carbondata.core.datastorage.store.impl.FileFactory - -class DictionaryDetailHelper extends DictionaryDetailService { - def getDictionaryDetail(dictfolderPath: String, primDimensions: Array[CarbonDimension], - table: CarbonTableIdentifier, storePath: String): DictionaryDetail = { - val carbonTablePath = CarbonStorePath.getCarbonTablePath(storePath, table) - val dictFilePaths = new Array[String](primDimensions.length) - val dictFileExists = new Array[Boolean](primDimensions.length) - val columnIdentifier = new Array[ColumnIdentifier](primDimensions.length) - - val fileType = FileFactory.getFileType(dictfolderPath) - // Metadata folder - val metadataDirectory = FileFactory.getCarbonFile(dictfolderPath, fileType) - // need list all dictionary file paths with exists flag - val carbonFiles = metadataDirectory.listFiles(new CarbonFileFilter { - @Override def accept(pathname: CarbonFile): Boolean = { - CarbonTablePath.isDictionaryFile(pathname) - } - }) - // 2 put dictionary file names to fileNamesMap - val fileNamesMap = new HashMap[String, Int] - for (i <- 0 until carbonFiles.length) { - fileNamesMap.put(carbonFiles(i).getName, i) - } - // 3 lookup fileNamesMap, if file name is in fileNamesMap, file is exists, or not. - primDimensions.zipWithIndex.foreach { f => - columnIdentifier(f._2) = f._1.getColumnIdentifier - dictFilePaths(f._2) = carbonTablePath.getDictionaryFilePath(f._1.getColumnId) - dictFileExists(f._2) = - fileNamesMap.get(CarbonTablePath.getDictionaryFileName(f._1.getColumnId)) match { - case None => false - case Some(_) => true - } - } - - DictionaryDetail(columnIdentifier, dictFilePaths, dictFileExists) - } -} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/scala/org/apache/carbondata/spark/KeyVal.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/KeyVal.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/KeyVal.scala deleted file mode 100644 index 254052b..0000000 --- a/integration/spark/src/main/scala/org/apache/carbondata/spark/KeyVal.scala +++ /dev/null @@ -1,89 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -/** - * It is just Key value class. I don't get any other alternate to make the RDD class to - * work with my minimum knowledge in scala. - * May be I will remove later once I gain good knowledge :) - * - */ - -package org.apache.carbondata.spark - -import org.apache.carbondata.core.load.LoadMetadataDetails - -trait Value[V] extends Serializable { - def getValue(value: Array[Object]): V -} - -class ValueImpl extends Value[Array[Object]] { - override def getValue(value: Array[Object]): Array[Object] = value -} - -trait RawValue[V] extends Serializable { - def getValue(value: Array[Any]): V -} - -class RawValueImpl extends RawValue[Array[Any]] { - override def getValue(value: Array[Any]): Array[Any] = value -} - -trait DataLoadResult[K, V] extends Serializable { - def getKey(key: String, value: LoadMetadataDetails): (K, V) -} - -class DataLoadResultImpl extends DataLoadResult[String, LoadMetadataDetails] { - override def getKey(key: String, value: LoadMetadataDetails): (String, LoadMetadataDetails) = { - (key, value) - } -} - - -trait PartitionResult[K, V] extends Serializable { - def getKey(key: Int, value: Boolean): (K, V) - -} - -class PartitionResultImpl extends PartitionResult[Int, Boolean] { - override def getKey(key: Int, value: Boolean): (Int, Boolean) = (key, value) -} - -trait MergeResult[K, V] extends Serializable { - def getKey(key: Int, value: Boolean): (K, V) - -} - -class MergeResultImpl extends MergeResult[Int, Boolean] { - override def getKey(key: Int, value: Boolean): (Int, Boolean) = (key, value) -} - -trait DeletedLoadResult[K, V] extends Serializable { - def getKey(key: String, value: String): (K, V) -} - -class DeletedLoadResultImpl extends DeletedLoadResult[String, String] { - override def getKey(key: String, value: String): (String, String) = (key, value) -} - -trait RestructureResult[K, V] extends Serializable { - def getKey(key: Int, value: Boolean): (K, V) -} - -class RestructureResultImpl extends RestructureResult[Int, Boolean] { - override def getKey(key: Int, value: Boolean): (Int, Boolean) = (key, value) -} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/scala/org/apache/carbondata/spark/csv/CarbonCsvReader.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/csv/CarbonCsvReader.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/csv/CarbonCsvReader.scala deleted file mode 100644 index 551fc9c..0000000 --- a/integration/spark/src/main/scala/org/apache/carbondata/spark/csv/CarbonCsvReader.scala +++ /dev/null @@ -1,182 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.databricks.spark.sql.readers - -/** - * Parser for parsing lines in bulk. Use this when efficiency is desired. - * - * @param iter iterator over lines in the file - * @param fieldSep the delimiter used to separate fields in a line - * @param lineSep the delimiter used to separate lines - * @param quote character used to quote fields - * @param escape character used to escape the quote character - * @param ignoreLeadingSpace ignore white space before a field - * @param ignoreTrailingSpace ignore white space after a field - * @param headers headers for the columns - * @param inputBufSize size of buffer to use for parsing input, tune for performance - * @param maxCols maximum number of columns allowed, for safety against bad inputs - */ -class CarbonBulkCsvReader (iter: Iterator[String], - split: Int, - fieldSep: Char = ',', - lineSep: String = "\n", - quote: Char = '"', - escape: Char = '\\', - commentMarker: Char = '#', - ignoreLeadingSpace: Boolean = true, - ignoreTrailingSpace: Boolean = true, - headers: Seq[String], - inputBufSize: Int = 128, - maxCols: Int = 20480) - extends CsvReader(fieldSep, - lineSep, - quote, - escape, - commentMarker, - ignoreLeadingSpace, - ignoreTrailingSpace, - headers, - inputBufSize, - maxCols) - with Iterator[Array[String]] { - - private val reader = new CarbonStringIteratorReader(iter) - parser.beginParsing(reader) - private var nextRecord = parser.parseNext() - - /** - * get the next parsed line. - * - * @return array of strings where each string is a field in the CSV record - */ - def next: Array[String] = { - val curRecord = nextRecord - if(curRecord != null) { - nextRecord = parser.parseNext() - } else { - throw new NoSuchElementException("next record is null") - } - curRecord - } - - def hasNext: Boolean = nextRecord != null - -} - -/** - * A Reader that "reads" from a sequence of lines. Spark's textFile method removes newlines at - * end of each line Univocity parser requires a Reader that provides access to the data to be - * parsed and needs the newlines to be present - * @param iter iterator over RDD[String] - */ -private class CarbonStringIteratorReader(val iter: Iterator[String]) extends java.io.Reader { - - private var next: Long = 0 - private var length: Long = 0 // length of input so far - private var start: Long = 0 - private var str: String = null // current string from iter - - /** - * fetch next string from iter, if done with current one - * pretend there is a new line at the end of every string we get from from iter - */ - private def refill(): Unit = { - if (length == next) { - if (iter.hasNext) { - str = iter.next - start = length - // add a space to every line except the last one to store '\n' - if (iter.hasNext) { - length += (str.length + 1) // allowance for newline removed by SparkContext.textFile() - } else { - length += str.length - } - } else { - str = null - } - } - } - - /** - * read the next character, if at end of string pretend there is a new line - */ - override def read(): Int = { - refill() - if(next >= length) { - -1 - } else { - val cur = next - start - next += 1 - if (cur == str.length) '\n' else str.charAt(cur.toInt) - } - } - - /** - * read from str into cbuf - */ - def read(cbuf: Array[Char], off: Int, len: Int): Int = { - refill() - var n = 0 - if ((off < 0) || (off > cbuf.length) || (len < 0) || - ((off + len) > cbuf.length) || ((off + len) < 0)) { - throw new IndexOutOfBoundsException() - } else if (len == 0) { - n = 0 - } else { - if (next >= length) { // end of input - n = -1 - } else { - n = Math.min(length - next, len).toInt // lesser of amount of input available or buf size - // add a '\n' to every line except the last one - if (n == length - next && iter.hasNext) { - str.getChars((next - start).toInt, (next - start + n - 1).toInt, cbuf, off) - cbuf(off + n - 1) = '\n' - } else { - str.getChars((next - start).toInt, (next - start + n).toInt, cbuf, off) - } - next += n - if (n < len) { - val m = read(cbuf, off + n, len - n) // have more space, fetch more input from iter - if(m != -1) n += m - } - } - } - n - } - - override def skip(ns: Long): Long = { - throw new IllegalArgumentException("Skip not implemented") - } - - override def ready: Boolean = { - refill() - true - } - - override def markSupported: Boolean = false - - override def mark(readAheadLimit: Int): Unit = { - throw new IllegalArgumentException("Mark not implemented") - } - - override def reset(): Unit = { - throw new IllegalArgumentException("Mark and hence reset not implemented") - } - - def close(): Unit = { } -}