carbondata-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jackylk <...@git.apache.org>
Subject [GitHub] incubator-carbondata pull request #412: [CARBONDATA-519]Added vector reader ...
Date Mon, 19 Dec 2016 14:47:40 GMT
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/412#discussion_r93044529
  
    --- Diff: integration/spark2/src/main/java/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
---
    @@ -0,0 +1,248 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.carbondata.spark.vectorreader;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.Map;
    +
    +import org.apache.carbondata.core.cache.dictionary.Dictionary;
    +import org.apache.carbondata.core.carbon.datastore.block.TableBlockInfo;
    +import org.apache.carbondata.core.carbon.metadata.datatype.DataType;
    +import org.apache.carbondata.core.carbon.metadata.encoder.Encoding;
    +import org.apache.carbondata.core.util.CarbonUtil;
    +import org.apache.carbondata.hadoop.CarbonInputSplit;
    +import org.apache.carbondata.hadoop.CarbonMultiBlockSplit;
    +import org.apache.carbondata.scan.executor.QueryExecutor;
    +import org.apache.carbondata.scan.executor.QueryExecutorFactory;
    +import org.apache.carbondata.scan.executor.exception.QueryExecutionException;
    +import org.apache.carbondata.scan.model.QueryDimension;
    +import org.apache.carbondata.scan.model.QueryMeasure;
    +import org.apache.carbondata.scan.model.QueryModel;
    +import org.apache.carbondata.scan.result.iterator.AbstractDetailQueryResultIterator;
    +import org.apache.carbondata.scan.result.vector.CarbonColumnVector;
    +import org.apache.carbondata.scan.result.vector.CarbonColumnarBatch;
    +import org.apache.carbondata.spark.util.CarbonScalaUtil;
    +
    +import org.apache.hadoop.mapreduce.InputSplit;
    +import org.apache.hadoop.mapreduce.RecordReader;
    +import org.apache.hadoop.mapreduce.TaskAttemptContext;
    +import org.apache.spark.memory.MemoryMode;
    +import org.apache.spark.sql.execution.vectorized.ColumnarBatch;
    +import org.apache.spark.sql.types.DecimalType;
    +import org.apache.spark.sql.types.StructField;
    +import org.apache.spark.sql.types.StructType;
    +
    +public class VectorizedCarbonRecordReader extends RecordReader<Void, Object> {
    +
    +  private int batchIdx = 0;
    +
    +  private int numBatched = 0;
    +
    +  private ColumnarBatch columnarBatch;
    +
    +  private CarbonColumnarBatch carbonColumnarBatch;
    +
    +  /**
    +   * If true, this class returns batches instead of rows.
    +   */
    +  private boolean returnColumnarBatch;
    +
    +  /**
    +   * The default config on whether columnarBatch should be offheap.
    +   */
    +  private static final MemoryMode DEFAULT_MEMORY_MODE = MemoryMode.ON_HEAP;
    +
    +  private QueryModel queryModel;
    +
    +  private AbstractDetailQueryResultIterator iterator;
    +
    +  private QueryExecutor queryExecutor;
    +
    +  public VectorizedCarbonRecordReader(QueryModel queryModel) {
    +    this.queryModel = queryModel;
    +    enableReturningBatches();
    +  }
    +
    +  /**
    +   * Implementation of RecordReader API.
    +   */
    +  @Override public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
    +      throws IOException, InterruptedException, UnsupportedOperationException {
    +    // The input split can contain single HDFS block or multiple blocks, so firstly get
all the
    +    // blocks and then set them in the query model.
    +    List<CarbonInputSplit> splitList;
    +    if (inputSplit instanceof CarbonInputSplit) {
    +      splitList = new ArrayList<>(1);
    +      splitList.add((CarbonInputSplit) inputSplit);
    +    } else if (inputSplit instanceof CarbonMultiBlockSplit) {
    +      // contains multiple blocks, this is an optimization for concurrent query.
    +      CarbonMultiBlockSplit multiBlockSplit = (CarbonMultiBlockSplit) inputSplit;
    +      splitList = multiBlockSplit.getAllSplits();
    +    } else {
    +      throw new RuntimeException("unsupported input split type: " + inputSplit);
    +    }
    +    List<TableBlockInfo> tableBlockInfoList = CarbonInputSplit.createBlocks(splitList);
    +    queryModel.setTableBlockInfos(tableBlockInfoList);
    +    queryModel.setVectorReader(true);
    +    try {
    +      queryExecutor = QueryExecutorFactory.getQueryExecutor(queryModel);
    +      iterator = (AbstractDetailQueryResultIterator) queryExecutor.execute(queryModel);
    +    } catch (QueryExecutionException e) {
    +      throw new InterruptedException(e.getMessage());
    +    }
    +  }
    +
    +  @Override public void close() throws IOException {
    +    if (columnarBatch != null) {
    +      columnarBatch.close();
    +      columnarBatch = null;
    +    }
    +    // clear dictionary cache
    +    Map<String, Dictionary> columnToDictionaryMapping = queryModel.getColumnToDictionaryMapping();
    +    if (null != columnToDictionaryMapping) {
    +      for (Map.Entry<String, Dictionary> entry : columnToDictionaryMapping.entrySet())
{
    +        CarbonUtil.clearDictionaryCache(entry.getValue());
    +      }
    +    }
    +    try {
    +      queryExecutor.finish();
    +    } catch (QueryExecutionException e) {
    +      throw new IOException(e);
    +    }
    +  }
    +
    +  @Override public boolean nextKeyValue() throws IOException, InterruptedException {
    +    resultBatch();
    +
    +    if (returnColumnarBatch) return nextBatch();
    +
    +    if (batchIdx >= numBatched) {
    +      if (!nextBatch()) return false;
    +    }
    +    ++batchIdx;
    +    return true;
    +  }
    +
    +  @Override public Object getCurrentValue() throws IOException, InterruptedException
{
    +    if (returnColumnarBatch) return columnarBatch;
    +    return columnarBatch.getRow(batchIdx - 1);
    +  }
    +
    +  @Override public Void getCurrentKey() throws IOException, InterruptedException {
    +    return null;
    +  }
    +
    +  @Override public float getProgress() throws IOException, InterruptedException {
    +    // TODO : Implement it based on total number of rows it is going to retrive.
    +    return 0;
    +  }
    +
    +  /**
    +   * Returns the ColumnarBatch object that will be used for all rows returned by this
reader.
    +   * This object is reused. Calling this enables the vectorized reader. This should be
called
    +   * before any calls to nextKeyValue/nextBatch.
    +   */
    +
    +  public void initBatch(MemoryMode memMode) {
    --- End diff --
    
    please check the visibility of each function in this class, many of them do not need to
be public


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message