ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject ignite git commit: IGNITE-3414: WIP on reducers. Local and remote assignment logic left.
Date Tue, 05 Jul 2016 10:15:32 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-3414 c683e5100 -> c9507ed4d


IGNITE-3414: WIP on reducers. Local and remote assignment logic left.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c9507ed4
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c9507ed4
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c9507ed4

Branch: refs/heads/ignite-3414
Commit: c9507ed4d77f5e53736031b7dbb725da4364549c
Parents: c683e51
Author: vozerov-gridgain <vozerov@gridgain.com>
Authored: Tue Jul 5 13:15:25 2016 +0300
Committer: vozerov-gridgain <vozerov@gridgain.com>
Committed: Tue Jul 5 13:15:25 2016 +0300

----------------------------------------------------------------------
 .../IgniteHadoopWeightedMapReducePlanner.java   | 93 ++++++++++++++++++--
 .../planner/HadoopMapReducePlanGroup.java       | 20 ++---
 2 files changed, 92 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/c9507ed4/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopWeightedMapReducePlanner.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopWeightedMapReducePlanner.java
b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopWeightedMapReducePlanner.java
index fa896d9..8a99946 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopWeightedMapReducePlanner.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopWeightedMapReducePlanner.java
@@ -99,7 +99,7 @@ public class IgniteHadoopWeightedMapReducePlanner extends HadoopAbstractMapReduc
 
         Mappers mappers = assignMappers(splits, top);
 
-        Map<UUID, int[]> reducers = assignReducers(top, mappers, reducerCnt);
+        Map<UUID, int[]> reducers = assignReducers(splits, top, mappers, reducerCnt);
 
         return new HadoopDefaultMapReducePlan(mappers.nodeToSplits, reducers);
     }
@@ -243,7 +243,7 @@ public class IgniteHadoopWeightedMapReducePlanner extends HadoopAbstractMapReduc
         for (HadoopMapReducePlanGroup grp : top.groups()) {
             MapperPriority priority = groupPriority(grp, affIds, priorityAffId);
 
-            int weight = grp.mappersWeight() +
+            int weight = grp.weight() +
                 (priority == MapperPriority.NORMAL ? rmtMapperWeight : locMapperWeight);
 
             if (resGrp == null || weight < resWeight || weight == resWeight &&
priority.value() > resPrio.value()) {
@@ -256,7 +256,7 @@ public class IgniteHadoopWeightedMapReducePlanner extends HadoopAbstractMapReduc
         assert resGrp != null;
 
         // Update group weight for further runs.
-        resGrp.mappersWeight(resWeight);
+        resGrp.weight(resWeight);
 
         // Return the best node from the group.
         return bestMapperNodeForGroup(resGrp, resPrio, affIds, priorityAffId);
@@ -327,15 +327,16 @@ public class IgniteHadoopWeightedMapReducePlanner extends HadoopAbstractMapReduc
     /**
      * Generate reducers.
      *
+     * @param splits Input splits.
      * @param top Topology.
      * @param mappers Mappers.
      * @param reducerCnt Reducer count.
      * @return Reducers.
      * @throws IgniteCheckedException If failed.
      */
-    private Map<UUID, int[]> assignReducers(HadoopMapReducePlanTopology top, Mappers
mappers, int reducerCnt)
-        throws IgniteCheckedException {
-        Map<UUID, Integer> reducers = assignReducers0(top, mappers, reducerCnt);
+    private Map<UUID, int[]> assignReducers(Collection<HadoopInputSplit> splits,
HadoopMapReducePlanTopology top,
+        Mappers mappers, int reducerCnt) throws IgniteCheckedException {
+        Map<UUID, Integer> reducers = assignReducers0(top, splits, mappers, reducerCnt);
 
         int cnt = 0;
 
@@ -359,18 +360,92 @@ public class IgniteHadoopWeightedMapReducePlanner extends HadoopAbstractMapReduc
      * Generate reducers.
      *
      * @param top Topology.
+     * @param splits Input splits.
      * @param mappers Mappers.
      * @param reducerCnt Reducer count.
      * @return Reducers.
      * @throws IgniteCheckedException If failed.
      */
-    private Map<UUID, Integer> assignReducers0(HadoopMapReducePlanTopology top, Mappers
mappers, int reducerCnt)
+    private Map<UUID, Integer> assignReducers0(HadoopMapReducePlanTopology top, Collection<HadoopInputSplit>
splits,
+        Mappers mappers, int reducerCnt)
         throws IgniteCheckedException {
+        Map<UUID, Integer> res = new HashMap<>();
 
+        // Assign reducers to splits.
+        Map<HadoopInputSplit, Integer> splitToReducerCnt = assignReducersToSplits(splits,
reducerCnt);
 
-        // TODO
+        // Assign as much local reducers as possible.
+        int remaining = 0;
 
-        return null;
+        for (Map.Entry<HadoopInputSplit, Integer> entry : splitToReducerCnt.entrySet())
{
+            HadoopInputSplit split = entry.getKey();
+            int cnt = entry.getValue();
+
+            int assigned = assignLocalReducers(split, cnt, res);
+
+            remaining += cnt - assigned;
+        }
+
+        // Assign the rest reducers.
+        assignRemoteReducers(remaining, res);
+
+        return res;
+    }
+
+    /**
+     * Assign local split reducers.
+     *
+     * @param split Split.
+     * @param cnt Reducer count.
+     * @param resMap Reducers result map.
+     * @return Number of assigned reducers.
+     */
+    private int assignLocalReducers(HadoopInputSplit split, int cnt, Map<UUID, Integer>
resMap) {
+        // TODO;
+
+        return 0;
+    }
+
+    /**
+     * Assign remote reducers.
+     *
+     * @param cnt Count.
+     * @param resMap Reducers result map.
+     */
+    private void assignRemoteReducers(int cnt, Map<UUID, Integer> resMap) {
+        // TODO;
+    }
+
+    /**
+     * Distribute reducers between splits.
+     *
+     * @param splits Splits.
+     * @param reducerCnt Reducer count.
+     * @return Map from input split to reducer count.
+     */
+    private Map<HadoopInputSplit, Integer> assignReducersToSplits(Collection<HadoopInputSplit>
splits,
+        int reducerCnt) {
+        Map<HadoopInputSplit, Integer> res = new IdentityHashMap<>(splits.size());
+
+        int base = reducerCnt / splits.size();
+
+        int remainder = reducerCnt - base * splits.size();
+
+        for (HadoopInputSplit split : splits) {
+            int val = base;
+
+            if (remainder > 0) {
+                val++;
+
+                remainder--;
+            }
+
+            res.put(split, val);
+        }
+
+        assert remainder == 0;
+
+        return res;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/c9507ed4/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopMapReducePlanGroup.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopMapReducePlanGroup.java
b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopMapReducePlanGroup.java
index 0fc9373..ecd7126 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopMapReducePlanGroup.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopMapReducePlanGroup.java
@@ -39,8 +39,8 @@ public class HadoopMapReducePlanGroup {
     /** CPUs. */
     private final int cpus;
 
-    /** Mappers weight. */
-    private int mappersWeight;
+    /** Weight. */
+    private int weight;
 
     /**
      * Constructor.
@@ -116,21 +116,17 @@ public class HadoopMapReducePlanGroup {
     }
 
     /**
-     * Get mappers weight.
-     *
-     * @return Mappers weight.
+     * @return weight.
      */
-    public int mappersWeight() {
-        return mappersWeight;
+    public int weight() {
+        return weight;
     }
 
     /**
-     * Set mappers weight.
-     *
-     * @param mappersWeight Mappers weight.
+     * @param weight weight.
      */
-    public void mappersWeight(int mappersWeight) {
-        this.mappersWeight = mappersWeight;
+    public void weight(int weight) {
+        this.weight = weight;
     }
 
 


Mime
View raw message