carbondata-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jackylk <...@git.apache.org>
Subject [GitHub] carbondata pull request #1808: [CARBONDATA-2023][DataLoad] Add size base blo...
Date Fri, 09 Feb 2018 01:45:05 GMT
Github user jackylk commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/1808#discussion_r167121062
  
    --- Diff: processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
---
    @@ -606,122 +609,296 @@ private static void createTaskListForNode(Map<String, List<List<Distributable>>>
       }
     
       /**
    -   * If any left over data blocks are present then assign those to nodes in round robin
way.
    +   * If any left over data blocks are present then assign those to nodes in round robin
way. This
    +   * will not obey the data locality.
        *
        * @param outputMap
    -   * @param uniqueBlocks
    +   * @param leftOverBlocks
        */
    -  private static void assignLeftOverBlocks(Map<String, List<Distributable>>
outputMap,
    -      Set<Distributable> uniqueBlocks, int noOfBlocksPerNode, List<String>
activeNodes) {
    +  private static void assignLeftOverBlocks(ArrayList<NodeMultiBlockRelation> outputMap,
    +      Set<Distributable> leftOverBlocks, long expectedSizePerNode, List<String>
activeNodes,
    +      BlockAssignmentStrategy blockAssignmentStrategy) {
    +    Map<String, Integer> node2Idx = new HashMap<>(outputMap.size());
    +    for (int idx = 0; idx < outputMap.size(); idx++) {
    +      node2Idx.put(outputMap.get(idx).getNode(), idx);
    +    }
     
    +    // iterate all the nodes and try to allocate blocks to the nodes
         if (activeNodes != null) {
           for (String activeNode : activeNodes) {
    -        List<Distributable> blockLst = outputMap.get(activeNode);
    -        if (null == blockLst) {
    +        if (LOGGER.isDebugEnabled()) {
    +          LOGGER.debug("Second assignment iteration: assign for executor: " + activeNode);
    +        }
    +
    +        Integer idx;
    +        List<Distributable> blockLst;
    +        if (node2Idx.containsKey(activeNode)) {
    +          idx = node2Idx.get(activeNode);
    +          blockLst = outputMap.get(idx).getBlocks();
    +        } else {
    +          idx = node2Idx.size();
               blockLst = new ArrayList<Distributable>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
             }
    -        populateBlocks(uniqueBlocks, noOfBlocksPerNode, blockLst);
    -        if (blockLst.size() > 0) {
    -          outputMap.put(activeNode, blockLst);
    +        populateBlocks(leftOverBlocks, expectedSizePerNode, blockLst, blockAssignmentStrategy);
    +
    +        if (!node2Idx.containsKey(activeNode) && blockLst.size() > 0) {
    +          outputMap.add(idx, new NodeMultiBlockRelation(activeNode, blockLst));
    +          node2Idx.put(activeNode, idx);
             }
           }
         } else {
    -      for (Map.Entry<String, List<Distributable>> entry : outputMap.entrySet())
{
    -        List<Distributable> blockLst = entry.getValue();
    -        populateBlocks(uniqueBlocks, noOfBlocksPerNode, blockLst);
    +      for (NodeMultiBlockRelation entry : outputMap) {
    +        List<Distributable> blockLst = entry.getBlocks();
    +        populateBlocks(leftOverBlocks, expectedSizePerNode, blockLst, blockAssignmentStrategy);
           }
    -
         }
     
    -    for (Map.Entry<String, List<Distributable>> entry : outputMap.entrySet())
{
    -      Iterator<Distributable> blocks = uniqueBlocks.iterator();
    -      if (blocks.hasNext()) {
    -        Distributable block = blocks.next();
    -        List<Distributable> blockLst = entry.getValue();
    -        blockLst.add(block);
    -        blocks.remove();
    -      }
    -    }
    +    // if there is still blocks left, allocate them in round robin manner to each nodes
    +    assignBlocksUseRoundRobin(outputMap, leftOverBlocks, blockAssignmentStrategy);
       }
     
       /**
    -   * The method populate the blockLst to be allocate to a specific node.
    -   * @param uniqueBlocks
    -   * @param noOfBlocksPerNode
    +   * assign blocks to nodes
    +   * @param remainingBlocks
    +   * @param expectedSizePerNode
    +   * @param blockLst
    +   * @param blockAssignmentStrategy
    +   */
    +  private static void populateBlocks(Set<Distributable> remainingBlocks,
    +      long expectedSizePerNode, List<Distributable> blockLst,
    +      BlockAssignmentStrategy blockAssignmentStrategy) {
    +    switch (blockAssignmentStrategy) {
    +      case BLOCK_NUM_FIRST:
    +        populateBlocksByNum(remainingBlocks, expectedSizePerNode, blockLst);
    +        break;
    +      case BLOCK_SIZE_FIRST:
    +        populateBlocksBySize(remainingBlocks, expectedSizePerNode, blockLst);
    +        break;
    +      default:
    +        throw new IllegalArgumentException(
    +            "Unsupported block assignment strategy: " + blockAssignmentStrategy);
    +    }
    +  }
    +  /**
    +   * allocate blocks by block num
    +   * @param remainingBlocks
    +   * @param expectedSizePerNode
        * @param blockLst
        */
    -  private static void populateBlocks(Set<Distributable> uniqueBlocks, int noOfBlocksPerNode,
    -      List<Distributable> blockLst) {
    -    Iterator<Distributable> blocks = uniqueBlocks.iterator();
    -    //if the node is already having the per block nodes then avoid assign the extra blocks
    -    if (blockLst.size() == noOfBlocksPerNode) {
    +  private static void populateBlocksByNum(Set<Distributable> remainingBlocks,
    +      long expectedSizePerNode, List<Distributable> blockLst) {
    +    Iterator<Distributable> blocks = remainingBlocks.iterator();
    +    // if the node is already having the per block nodes then avoid assign the extra
blocks
    +    if (blockLst.size() == expectedSizePerNode) {
           return;
         }
         while (blocks.hasNext()) {
           Distributable block = blocks.next();
           blockLst.add(block);
           blocks.remove();
    -      if (blockLst.size() >= noOfBlocksPerNode) {
    +      if (blockLst.size() >= expectedSizePerNode) {
    +        break;
    +      }
    +    }
    +  }
    +
    +  /**
    +   * allocate blocks by block size
    +   * @param remainingBlocks
    +   * @param expectedSizePerNode
    +   * @param blockLst
    +   */
    +  private static void populateBlocksBySize(Set<Distributable> remainingBlocks,
    +      long expectedSizePerNode, List<Distributable> blockLst) {
    +    Iterator<Distributable> blocks = remainingBlocks.iterator();
    +    //if the node is already having the avg node size then avoid assign the extra blocks
    +    long fileSize = 0;
    +    for (Distributable block : blockLst) {
    +      fileSize += ((TableBlockInfo) block).getBlockLength();
    +    }
    +    if (fileSize >= expectedSizePerNode) {
    +      LOGGER.debug("Capacity is full, skip allocate blocks on this node");
    +      return;
    +    }
    +
    +    while (blocks.hasNext()) {
    +      Distributable block = blocks.next();
    +      long thisBlockSize = ((TableBlockInfo) block).getBlockLength();
    +      if (fileSize < expectedSizePerNode) {
    +        // `fileSize==0` means there are no blocks assigned to this node before
    +        if (fileSize == 0 || fileSize + thisBlockSize <= expectedSizePerNode * 1.1D)
{
    +          blockLst.add(block);
    +          if (LOGGER.isDebugEnabled()) {
    +            LOGGER.debug("Second Assignment iteration: "
    +                + ((TableBlockInfo) block).getFilePath() + "-"
    +                + ((TableBlockInfo) block).getBlockLength() + "-->currentNode");
    +          }
    +          fileSize += thisBlockSize;
    +          blocks.remove();
    +        }
    +      } else {
             break;
           }
         }
       }
     
    +  /**
    +   * allocate the blocks in round robin manner
    +   * @param node2Blocks
    +   * @param remainingBlocks
    +   * @param blockAssignmentStrategy
    +   */
    +  private static void assignBlocksUseRoundRobin(ArrayList<NodeMultiBlockRelation>
node2Blocks,
    +      Set<Distributable> remainingBlocks, BlockAssignmentStrategy blockAssignmentStrategy)
{
    +    switch (blockAssignmentStrategy) {
    +      case BLOCK_NUM_FIRST:
    +        roundRobinAssignBlocksByNum(node2Blocks, remainingBlocks);
    +        break;
    +      case BLOCK_SIZE_FIRST:
    +        roundRobinAssignBlocksBySize(node2Blocks, remainingBlocks);
    +        break;
    +      default:
    +        throw new IllegalArgumentException("Unsupported block assignment strategy: "
    +            + blockAssignmentStrategy);
    +    }
    +  }
    +
    +  private static void roundRobinAssignBlocksByNum(ArrayList<NodeMultiBlockRelation>
outputMap,
    +      Set<Distributable> remainingBlocks) {
    +    for (NodeMultiBlockRelation relation: outputMap) {
    +      Iterator<Distributable> blocks = remainingBlocks.iterator();
    +      if (blocks.hasNext()) {
    +        Distributable block = blocks.next();
    +        List<Distributable> blockLst = relation.getBlocks();
    +        blockLst.add(block);
    +        blocks.remove();
    +      }
    +    }
    +  }
    +
    +  private static void roundRobinAssignBlocksBySize(ArrayList<NodeMultiBlockRelation>
outputMap,
    +      Set<Distributable> remainingBlocks) {
    +    Iterator<Distributable> blocks = remainingBlocks.iterator();
    +    while (blocks.hasNext()) {
    +      // sort the allocated node-2-blocks in ascending order, the total data size of
first one is
    +      // the smallest, so we assign this block to it.
    +      Collections.sort(outputMap, NodeMultiBlockRelation.DATA_SIZE_ASC_COMPARATOR);
    +      Distributable block = blocks.next();
    +      List<Distributable> blockLst = outputMap.get(0).getBlocks();
    +      blockLst.add(block);
    +      if (LOGGER.isDebugEnabled()) {
    +        LOGGER.debug("RoundRobin assignment iteration: "
    +            + ((TableBlockInfo) block).getFilePath() + "-"
    +            + ((TableBlockInfo) block).getBlockLength() + "-->" + outputMap.get(0).getNode());
    +      }
    +      blocks.remove();
    +    }
    +  }
       /**
        * To create the final output of the Node and Data blocks
        *
    -   * @param outputMap
    -   * @param blocksPerNode
    -   * @param uniqueBlocks
    -   * @param nodeAndBlockMapping
    +   * @param outputNode2Blocks
    +   * @param expectedSizePerNode
    +   * @param remainingBlocks
    +   * @param inputNode2Blocks
    --- End diff --
    
    move @param if no description


---

Mime
View raw message