carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gvram...@apache.org
Subject [1/2] incubator-carbondata git commit: CARBONDATA-153 Record count is not matching while loading the data when one data node went down in HA setup
Date Mon, 22 Aug 2016 14:47:15 GMT
Repository: incubator-carbondata
Updated Branches:
  refs/heads/branch-0.1 de373c965 -> 1f4bcdc97


CARBONDATA-153 Record count is not matching while loading the data when one data node went
down in HA setup


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/6997f87a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/6997f87a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/6997f87a

Branch: refs/heads/branch-0.1
Commit: 6997f87ae802585fe235e24efe6d0ab21ca1b269
Parents: de373c9
Author: mohammadshahidkhan <mohdshahidkhan1987@gmail.com>
Authored: Tue Aug 9 10:47:02 2016 +0530
Committer: Venkata Ramana G <ramana.gollamudi@huawei.com>
Committed: Mon Aug 22 19:35:22 2016 +0530

----------------------------------------------------------------------
 .../carbondata/spark/load/CarbonLoaderUtil.java | 55 ++++++++++++++------
 1 file changed, 39 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6997f87a/integration/spark/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
b/integration/spark/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
index f663b06..b934b1d 100644
--- a/integration/spark/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
+++ b/integration/spark/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
@@ -1118,7 +1118,7 @@ public final class CarbonLoaderUtil {
     createOutputMap(nodeBlocksMap, blocksPerNode, uniqueBlocks, nodeAndBlockMapping, activeNodes);
 
     // if any blocks remain then assign them to nodes in round robin.
-    assignLeftOverBlocks(nodeBlocksMap, uniqueBlocks, blocksPerNode);
+    assignLeftOverBlocks(nodeBlocksMap, uniqueBlocks, blocksPerNode, activeNodes);
 
     return nodeBlocksMap;
   }
@@ -1200,23 +1200,23 @@ public final class CarbonLoaderUtil {
    * @param uniqueBlocks
    */
   private static void assignLeftOverBlocks(Map<String, List<Distributable>> outputMap,
-      Set<Distributable> uniqueBlocks, int noOfBlocksPerNode) {
-
-    for (Map.Entry<String, List<Distributable>> entry : outputMap.entrySet())
{
-      Iterator<Distributable> blocks = uniqueBlocks.iterator();
-      List<Distributable> blockLst = entry.getValue();
-      //if the node is already having the per block nodes then avoid assign the extra blocks
-      if (blockLst.size() == noOfBlocksPerNode) {
-        continue;
-      }
-      while (blocks.hasNext()) {
-        Distributable block = blocks.next();
-        blockLst.add(block);
-        blocks.remove();
-        if (blockLst.size() >= noOfBlocksPerNode) {
-          break;
+      Set<Distributable> uniqueBlocks, int noOfBlocksPerNode, List<String> activeNodes)
{
+
+    if (activeNodes != null) {
+      for (String activeNode : activeNodes) {
+        List<Distributable> blockLst = outputMap.get(activeNode);
+        if (null == blockLst) {
+          blockLst = new ArrayList<Distributable>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+          outputMap.put(activeNode, blockLst);
         }
+        populateBlocks(uniqueBlocks, noOfBlocksPerNode, blockLst);
+      }
+    } else {
+      for (Map.Entry<String, List<Distributable>> entry : outputMap.entrySet())
{
+        List<Distributable> blockLst = entry.getValue();
+        populateBlocks(uniqueBlocks, noOfBlocksPerNode, blockLst);
       }
+
     }
 
     for (Map.Entry<String, List<Distributable>> entry : outputMap.entrySet())
{
@@ -1231,6 +1231,29 @@ public final class CarbonLoaderUtil {
   }
 
   /**
+   * The method populate the blockLst to be allocate to a specific node.
+   * @param uniqueBlocks
+   * @param noOfBlocksPerNode
+   * @param blockLst
+   */
+  private static void populateBlocks(Set<Distributable> uniqueBlocks, int noOfBlocksPerNode,
+      List<Distributable> blockLst) {
+    Iterator<Distributable> blocks = uniqueBlocks.iterator();
+    //if the node is already having the per block nodes then avoid assign the extra blocks
+    if (blockLst.size() == noOfBlocksPerNode) {
+      return;
+    }
+    while (blocks.hasNext()) {
+      Distributable block = blocks.next();
+      blockLst.add(block);
+      blocks.remove();
+      if (blockLst.size() >= noOfBlocksPerNode) {
+        break;
+      }
+    }
+  }
+
+  /**
    * To create the final output of the Node and Data blocks
    *
    * @param outputMap


Mime
View raw message