carbondata-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sounakr <...@git.apache.org>
Subject [GitHub] incubator-carbondata pull request #492: [CARBONDATA-440] Providing the updat...
Date Fri, 06 Jan 2017 11:12:50 GMT
Github user sounakr commented on a diff in the pull request:

    https://github.com/apache/incubator-carbondata/pull/492#discussion_r94932078
  
    --- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java ---
    @@ -358,66 +372,176 @@ private Expression getFilterPredicates(Configuration configuration)
{
        * @return list of table block
        * @throws IOException
        */
    -  private List<TableBlockInfo> getTableBlockInfo(JobContext job, String segmentId)
    -      throws IOException {
    +  private List<TableBlockInfo> getTableBlockInfo(JobContext job,
    +      TableSegmentUniqueIdentifier tableSegmentUniqueIdentifier,
    +      Set<SegmentTaskIndexStore.TaskBucketHolder> taskKeys,
    +      List<String> updatedTaskList,
    +      UpdateVO updateDetails,
    +      SegmentUpdateStatusManager updateStatusManager,
    +      String segmentId)
    +    throws IOException {
         List<TableBlockInfo> tableBlockInfoList = new ArrayList<TableBlockInfo>();
     
         // get file location of all files of given segment
         JobContext newJob =
             new JobContextImpl(new Configuration(job.getConfiguration()), job.getJobID());
    -    newJob.getConfiguration().set(CarbonInputFormat.INPUT_SEGMENT_NUMBERS, segmentId
+ "");
    +    newJob.getConfiguration().set(CarbonInputFormat.INPUT_SEGMENT_NUMBERS,
    +        tableSegmentUniqueIdentifier.getSegmentId() + "");
     
         // identify table blocks
         for (InputSplit inputSplit : getSplitsInternal(newJob)) {
           CarbonInputSplit carbonInputSplit = (CarbonInputSplit) inputSplit;
    -      BlockletInfos blockletInfos = new BlockletInfos(carbonInputSplit.getNumberOfBlocklets(),
0,
    -          carbonInputSplit.getNumberOfBlocklets());
    -      tableBlockInfoList.add(
    -          new TableBlockInfo(carbonInputSplit.getPath().toString(), carbonInputSplit.getStart(),
    -              segmentId, carbonInputSplit.getLocations(), carbonInputSplit.getLength(),
    -              blockletInfos, carbonInputSplit.getVersion()));
    +      // if blockname and update block name is same then cmpare  its time stamp with
    +      // tableSegmentUniqueIdentifiertimestamp if time stamp is greater
    +      // then add as TableInfo object.
    +      if (isValidBlockBasedOnUpdateDetails(taskKeys, carbonInputSplit, updateDetails,
    +          updateStatusManager, segmentId)) {
    +        BlockletInfos blockletInfos = new BlockletInfos(carbonInputSplit.getNumberOfBlocklets(),
0,
    +            carbonInputSplit.getNumberOfBlocklets());
    +        tableBlockInfoList.add(
    +            new TableBlockInfo(carbonInputSplit.getPath().toString(), carbonInputSplit.getStart(),
    +                tableSegmentUniqueIdentifier.getSegmentId(), carbonInputSplit.getLocations(),
    +                carbonInputSplit.getLength(), blockletInfos, carbonInputSplit.getVersion(),
    +                carbonInputSplit.getBlockStorageIdMap()));
    +      }
         }
         return tableBlockInfoList;
       }
     
    +  private boolean isValidBlockBasedOnUpdateDetails(
    +      Set<SegmentTaskIndexStore.TaskBucketHolder> taskKeys, CarbonInputSplit carbonInputSplit,
    +      UpdateVO updateDetails, SegmentUpdateStatusManager updateStatusManager, String
segmentId) {
    +    String taskID = null;
    +    if (null != carbonInputSplit) {
    +      if (!updateStatusManager.isBlockValid(segmentId, carbonInputSplit.getPath().getName()))
{
    +        return false;
    +      }
    +
    +      if (null == taskKeys) {
    +        return true;
    +      }
    +
    +      taskID = CarbonTablePath.DataFileUtil.getTaskNo(carbonInputSplit.getPath().getName());
    +      String bucketNo =
    +          CarbonTablePath.DataFileUtil.getBucketNo(carbonInputSplit.getPath().getName());
    +
    +      SegmentTaskIndexStore.TaskBucketHolder taskBucketHolder =
    +          new SegmentTaskIndexStore.TaskBucketHolder(taskID, bucketNo);
    +
    +      String blockTimestamp = carbonInputSplit.getPath().getName()
    +          .substring(carbonInputSplit.getPath().getName().lastIndexOf('-') + 1,
    +              carbonInputSplit.getPath().getName().lastIndexOf('.'));
    +      if (!(updateDetails.getUpdateDeltaStartTimestamp() != null
    +          && Long.parseLong(blockTimestamp) < updateDetails.getUpdateDeltaStartTimestamp()))
{
    +        if (!taskKeys.contains(taskBucketHolder)) {
    +          return true;
    +        }
    +      }
    +    }
    +    return false;
    +  }
    +
       /**
        * It returns index for each task file.
        * @param job
        * @param absoluteTableIdentifier
        * @param segmentId
        * @return
        * @throws IOException
    +   * @throws IndexBuilderException
        */
       private Map<SegmentTaskIndexStore.TaskBucketHolder, AbstractIndex> getSegmentAbstractIndexs(
           JobContext job, AbsoluteTableIdentifier absoluteTableIdentifier, String segmentId,
    -      CacheClient cacheClient) throws IOException {
    +      CacheClient cacheClient, SegmentUpdateStatusManager updateStatusManager) throws
IOException {
         Map<SegmentTaskIndexStore.TaskBucketHolder, AbstractIndex> segmentIndexMap
= null;
    +    SegmentTaskIndexWrapper segmentTaskIndexWrapper = null;
    +    List<String> updatedTaskList = null;
    +    boolean isSegmentUpdated = false;
    +    Set<SegmentTaskIndexStore.TaskBucketHolder> taskKeys = null;
         TableSegmentUniqueIdentifier tableSegmentUniqueIdentifier =
             new TableSegmentUniqueIdentifier(absoluteTableIdentifier, segmentId);
    -    SegmentTaskIndexWrapper segmentTaskIndexWrapper = (SegmentTaskIndexWrapper)
    +    SegmentStatusManager statusManager = new SegmentStatusManager(absoluteTableIdentifier);
    +    segmentTaskIndexWrapper =
             cacheClient.getSegmentAccessClient().getIfPresent(tableSegmentUniqueIdentifier);
    +    UpdateVO updateDetails = updateStatusManager.getInvalidTimestampRange(segmentId);
         if (null != segmentTaskIndexWrapper) {
           segmentIndexMap = segmentTaskIndexWrapper.getTaskIdToTableSegmentMap();
    +      if (isSegmentUpdate(segmentTaskIndexWrapper, updateDetails)) {
    +        taskKeys = segmentIndexMap.keySet();
    +        isSegmentUpdated = true;
    +        updatedTaskList =
    +            statusManager.getUpdatedTasksDetailsForSegment(segmentId, updateStatusManager);
    +      }
         }
    -
         // if segment tree is not loaded, load the segment tree
    -    if (segmentIndexMap == null) {
    -      // List<FileStatus> fileStatusList = new LinkedList<FileStatus>();
    -      List<TableBlockInfo> tableBlockInfoList = getTableBlockInfo(job, segmentId);
    -      // getFileStatusOfSegments(job, new int[]{ segmentId }, fileStatusList);
    -
    -      Map<String, List<TableBlockInfo>> segmentToTableBlocksInfos = new HashMap<>();
    -      segmentToTableBlocksInfos.put(segmentId, tableBlockInfoList);
    -
    -      // get Btree blocks for given segment
    -      tableSegmentUniqueIdentifier.setSegmentToTableBlocksInfos(segmentToTableBlocksInfos);
    -      segmentTaskIndexWrapper =
    -          cacheClient.getSegmentAccessClient().get(tableSegmentUniqueIdentifier);
    -      segmentIndexMap = segmentTaskIndexWrapper.getTaskIdToTableSegmentMap();
    +    if (segmentIndexMap == null || isSegmentUpdated) {
    +      // if the segment is updated only the updated blocks TableInfo instance has to
be
    +      // retrieved. the same will be filtered based on taskKeys , if the task is same
    +      // for the block then dont add it since already its btree is loaded.
    +      List<TableBlockInfo> tableBlockInfoList =
    +          getTableBlockInfo(job, tableSegmentUniqueIdentifier, taskKeys, updatedTaskList,
    +              updateStatusManager.getInvalidTimestampRange(segmentId), updateStatusManager,
    +              segmentId);
    +      if (!tableBlockInfoList.isEmpty()) {
    +        // getFileStatusOfSegments(job, new int[]{ segmentId }, fileStatusList);
    +        Map<String, List<TableBlockInfo>> segmentToTableBlocksInfos = new
HashMap<>();
    +        segmentToTableBlocksInfos.put(segmentId, tableBlockInfoList);
    +        // get Btree blocks for given segment
    +        tableSegmentUniqueIdentifier.setSegmentToTableBlocksInfos(segmentToTableBlocksInfos);
    +        tableSegmentUniqueIdentifier.setIsSegmentUpdated(isSegmentUpdated);
    +        segmentTaskIndexWrapper =
    +            cacheClient.getSegmentAccessClient().get(tableSegmentUniqueIdentifier);
    +        segmentIndexMap = segmentTaskIndexWrapper.getTaskIdToTableSegmentMap();
    +      }
         }
         return segmentIndexMap;
       }
     
    +  public BlockMappingVO getBlockRowCount(JobContext job,
    --- End diff --
    
    Done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message